Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * matview.c
4 : * materialized view support
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/matview.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/multixact.h"
21 : #include "access/tableam.h"
22 : #include "access/xact.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/pg_am.h"
26 : #include "catalog/pg_opclass.h"
27 : #include "commands/cluster.h"
28 : #include "commands/matview.h"
29 : #include "commands/tablecmds.h"
30 : #include "commands/tablespace.h"
31 : #include "executor/executor.h"
32 : #include "executor/spi.h"
33 : #include "miscadmin.h"
34 : #include "pgstat.h"
35 : #include "rewrite/rewriteHandler.h"
36 : #include "storage/lmgr.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/builtins.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/snapmgr.h"
42 : #include "utils/syscache.h"
43 :
44 :
45 : typedef struct
46 : {
47 : DestReceiver pub; /* publicly-known function pointers */
48 : Oid transientoid; /* OID of new heap into which to store */
49 : /* These fields are filled by transientrel_startup: */
50 : Relation transientrel; /* relation to write to */
51 : CommandId output_cid; /* cmin to insert in output tuples */
52 : int ti_options; /* table_tuple_insert performance options */
53 : BulkInsertState bistate; /* bulk insert state */
54 : } DR_transientrel;
55 :
56 : static int matview_maintenance_depth = 0;
57 :
58 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
59 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
60 : static void transientrel_shutdown(DestReceiver *self);
61 : static void transientrel_destroy(DestReceiver *self);
62 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
63 : const char *queryString, bool is_create);
64 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
65 : int save_sec_context);
66 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
67 : static bool is_usable_unique_index(Relation indexRel);
68 : static void OpenMatViewIncrementalMaintenance(void);
69 : static void CloseMatViewIncrementalMaintenance(void);
70 :
71 : /*
72 : * SetMatViewPopulatedState
73 : * Mark a materialized view as populated, or not.
74 : *
75 : * NOTE: caller must be holding an appropriate lock on the relation.
76 : */
77 : void
78 618 : SetMatViewPopulatedState(Relation relation, bool newstate)
79 : {
80 : Relation pgrel;
81 : HeapTuple tuple;
82 :
83 : Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
84 :
85 : /*
86 : * Update relation's pg_class entry. Crucial side-effect: other backends
87 : * (and this one too!) are sent SI message to make them rebuild relcache
88 : * entries.
89 : */
90 618 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
91 618 : tuple = SearchSysCacheCopy1(RELOID,
92 : ObjectIdGetDatum(RelationGetRelid(relation)));
93 618 : if (!HeapTupleIsValid(tuple))
94 0 : elog(ERROR, "cache lookup failed for relation %u",
95 : RelationGetRelid(relation));
96 :
97 618 : ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
98 :
99 618 : CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
100 :
101 618 : heap_freetuple(tuple);
102 618 : table_close(pgrel, RowExclusiveLock);
103 :
104 : /*
105 : * Advance command counter to make the updated pg_class row locally
106 : * visible.
107 : */
108 618 : CommandCounterIncrement();
109 618 : }
110 :
111 : /*
112 : * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
113 : *
114 : * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
115 : * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
116 : * statement associated with the materialized view. The statement node's
117 : * skipData field shows whether the clause was used.
118 : */
119 : ObjectAddress
120 268 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
121 : QueryCompletion *qc)
122 : {
123 : Oid matviewOid;
124 : LOCKMODE lockmode;
125 :
126 : /* Determine strength of lock needed. */
127 268 : lockmode = stmt->concurrent ? ExclusiveLock : AccessExclusiveLock;
128 :
129 : /*
130 : * Get a lock until end of transaction.
131 : */
132 268 : matviewOid = RangeVarGetRelidExtended(stmt->relation,
133 : lockmode, 0,
134 : RangeVarCallbackMaintainsTable,
135 : NULL);
136 :
137 458 : return RefreshMatViewByOid(matviewOid, false, stmt->skipData,
138 262 : stmt->concurrent, queryString, qc);
139 : }
140 :
141 : /*
142 : * RefreshMatViewByOid -- refresh materialized view by OID
143 : *
144 : * This refreshes the materialized view by creating a new table and swapping
145 : * the relfilenumbers of the new table and the old materialized view, so the OID
146 : * of the original materialized view is preserved. Thus we do not lose GRANT
147 : * nor references to this materialized view.
148 : *
149 : * If skipData is true, this is effectively like a TRUNCATE; otherwise it is
150 : * like a TRUNCATE followed by an INSERT using the SELECT statement associated
151 : * with the materialized view.
152 : *
153 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
154 : * the new heap, it's better to create the indexes afterwards than to fill them
155 : * incrementally while we load.
156 : *
157 : * The matview's "populated" state is changed based on whether the contents
158 : * reflect the result set of the materialized view's query.
159 : *
160 : * This is also used to populate the materialized view created by CREATE
161 : * MATERIALIZED VIEW command.
162 : */
163 : ObjectAddress
164 624 : RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData,
165 : bool concurrent, const char *queryString,
166 : QueryCompletion *qc)
167 : {
168 : Relation matviewRel;
169 : RewriteRule *rule;
170 : List *actions;
171 : Query *dataQuery;
172 : Oid tableSpace;
173 : Oid relowner;
174 : Oid OIDNewHeap;
175 624 : uint64 processed = 0;
176 : char relpersistence;
177 : Oid save_userid;
178 : int save_sec_context;
179 : int save_nestlevel;
180 : ObjectAddress address;
181 :
182 624 : matviewRel = table_open(matviewOid, NoLock);
183 624 : relowner = matviewRel->rd_rel->relowner;
184 :
185 : /*
186 : * Switch to the owner's userid, so that any functions are run as that
187 : * user. Also lock down security-restricted operations and arrange to
188 : * make GUC variable changes local to this command.
189 : */
190 624 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
191 624 : SetUserIdAndSecContext(relowner,
192 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
193 624 : save_nestlevel = NewGUCNestLevel();
194 624 : RestrictSearchPath();
195 :
196 : /* Make sure it is a materialized view. */
197 624 : if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200 : errmsg("\"%s\" is not a materialized view",
201 : RelationGetRelationName(matviewRel))));
202 :
203 : /* Check that CONCURRENTLY is not specified if not populated. */
204 624 : if (concurrent && !RelationIsPopulated(matviewRel))
205 0 : ereport(ERROR,
206 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
207 : errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
208 :
209 : /* Check that conflicting options have not been specified. */
210 624 : if (concurrent && skipData)
211 6 : ereport(ERROR,
212 : (errcode(ERRCODE_SYNTAX_ERROR),
213 : errmsg("%s and %s options cannot be used together",
214 : "CONCURRENTLY", "WITH NO DATA")));
215 :
216 : /*
217 : * Check that everything is correct for a refresh. Problems at this point
218 : * are internal errors, so elog is sufficient.
219 : */
220 618 : if (matviewRel->rd_rel->relhasrules == false ||
221 618 : matviewRel->rd_rules->numLocks < 1)
222 0 : elog(ERROR,
223 : "materialized view \"%s\" is missing rewrite information",
224 : RelationGetRelationName(matviewRel));
225 :
226 618 : if (matviewRel->rd_rules->numLocks > 1)
227 0 : elog(ERROR,
228 : "materialized view \"%s\" has too many rules",
229 : RelationGetRelationName(matviewRel));
230 :
231 618 : rule = matviewRel->rd_rules->rules[0];
232 618 : if (rule->event != CMD_SELECT || !(rule->isInstead))
233 0 : elog(ERROR,
234 : "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
235 : RelationGetRelationName(matviewRel));
236 :
237 618 : actions = rule->actions;
238 618 : if (list_length(actions) != 1)
239 0 : elog(ERROR,
240 : "the rule for materialized view \"%s\" is not a single action",
241 : RelationGetRelationName(matviewRel));
242 :
243 : /*
244 : * Check that there is a unique index with no WHERE clause on one or more
245 : * columns of the materialized view if CONCURRENTLY is specified.
246 : */
247 618 : if (concurrent)
248 : {
249 78 : List *indexoidlist = RelationGetIndexList(matviewRel);
250 : ListCell *indexoidscan;
251 78 : bool hasUniqueIndex = false;
252 :
253 : Assert(!is_create);
254 :
255 90 : foreach(indexoidscan, indexoidlist)
256 : {
257 84 : Oid indexoid = lfirst_oid(indexoidscan);
258 : Relation indexRel;
259 :
260 84 : indexRel = index_open(indexoid, AccessShareLock);
261 84 : hasUniqueIndex = is_usable_unique_index(indexRel);
262 84 : index_close(indexRel, AccessShareLock);
263 84 : if (hasUniqueIndex)
264 72 : break;
265 : }
266 :
267 78 : list_free(indexoidlist);
268 :
269 78 : if (!hasUniqueIndex)
270 6 : ereport(ERROR,
271 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
272 : errmsg("cannot refresh materialized view \"%s\" concurrently",
273 : quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
274 : RelationGetRelationName(matviewRel))),
275 : errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
276 : }
277 :
278 : /*
279 : * The stored query was rewritten at the time of the MV definition, but
280 : * has not been scribbled on by the planner.
281 : */
282 612 : dataQuery = linitial_node(Query, actions);
283 :
284 : /*
285 : * Check for active uses of the relation in the current transaction, such
286 : * as open scans.
287 : *
288 : * NB: We count on this to protect us against problems with refreshing the
289 : * data using TABLE_INSERT_FROZEN.
290 : */
291 612 : CheckTableNotInUse(matviewRel,
292 : is_create ? "CREATE MATERIALIZED VIEW" :
293 : "REFRESH MATERIALIZED VIEW");
294 :
295 : /*
296 : * Tentatively mark the matview as populated or not (this will roll back
297 : * if we fail later).
298 : */
299 612 : SetMatViewPopulatedState(matviewRel, !skipData);
300 :
301 : /* Concurrent refresh builds new data in temp tablespace, and does diff. */
302 612 : if (concurrent)
303 : {
304 72 : tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
305 72 : relpersistence = RELPERSISTENCE_TEMP;
306 : }
307 : else
308 : {
309 540 : tableSpace = matviewRel->rd_rel->reltablespace;
310 540 : relpersistence = matviewRel->rd_rel->relpersistence;
311 : }
312 :
313 : /*
314 : * Create the transient table that will receive the regenerated data. Lock
315 : * it against access by any other process until commit (by which time it
316 : * will be gone).
317 : */
318 1224 : OIDNewHeap = make_new_heap(matviewOid, tableSpace,
319 612 : matviewRel->rd_rel->relam,
320 : relpersistence, ExclusiveLock);
321 : Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
322 :
323 : /* Generate the data, if wanted. */
324 612 : if (!skipData)
325 : {
326 : DestReceiver *dest;
327 :
328 612 : dest = CreateTransientRelDestReceiver(OIDNewHeap);
329 612 : processed = refresh_matview_datafill(dest, dataQuery, queryString,
330 : is_create);
331 : }
332 :
333 : /* Make the matview match the newly generated data. */
334 570 : if (concurrent)
335 : {
336 72 : int old_depth = matview_maintenance_depth;
337 :
338 72 : PG_TRY();
339 : {
340 72 : refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
341 : save_sec_context);
342 : }
343 12 : PG_CATCH();
344 : {
345 12 : matview_maintenance_depth = old_depth;
346 12 : PG_RE_THROW();
347 : }
348 60 : PG_END_TRY();
349 : Assert(matview_maintenance_depth == old_depth);
350 : }
351 : else
352 : {
353 498 : refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
354 :
355 : /*
356 : * Inform cumulative stats system about our activity: basically, we
357 : * truncated the matview and inserted some new data. (The concurrent
358 : * code path above doesn't need to worry about this because the
359 : * inserts and deletes it issues get counted by lower-level code.)
360 : */
361 492 : pgstat_count_truncate(matviewRel);
362 492 : if (!skipData)
363 492 : pgstat_count_heap_insert(matviewRel, processed);
364 : }
365 :
366 552 : table_close(matviewRel, NoLock);
367 :
368 : /* Roll back any GUC changes */
369 552 : AtEOXact_GUC(false, save_nestlevel);
370 :
371 : /* Restore userid and security context */
372 552 : SetUserIdAndSecContext(save_userid, save_sec_context);
373 :
374 552 : ObjectAddressSet(address, RelationRelationId, matviewOid);
375 :
376 : /*
377 : * Save the rowcount so that pg_stat_statements can track the total number
378 : * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
379 : * still don't display the rowcount in the command completion tag output,
380 : * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
381 : * command tag is left false in cmdtaglist.h. Otherwise, the change of
382 : * completion tag output might break applications using it.
383 : *
384 : * When called from CREATE MATERIALIZED VIEW command, the rowcount is
385 : * displayed with the command tag CMDTAG_SELECT.
386 : */
387 552 : if (qc)
388 540 : SetQueryCompletion(qc,
389 : is_create ? CMDTAG_SELECT : CMDTAG_REFRESH_MATERIALIZED_VIEW,
390 : processed);
391 :
392 552 : return address;
393 : }
394 :
395 : /*
396 : * refresh_matview_datafill
397 : *
398 : * Execute the given query, sending result rows to "dest" (which will
399 : * insert them into the target matview).
400 : *
401 : * Returns number of rows inserted.
402 : */
403 : static uint64
404 612 : refresh_matview_datafill(DestReceiver *dest, Query *query,
405 : const char *queryString, bool is_create)
406 : {
407 : List *rewritten;
408 : PlannedStmt *plan;
409 : QueryDesc *queryDesc;
410 : Query *copied_query;
411 : uint64 processed;
412 :
413 : /* Lock and rewrite, using a copy to preserve the original query. */
414 612 : copied_query = copyObject(query);
415 612 : AcquireRewriteLocks(copied_query, true, false);
416 612 : rewritten = QueryRewrite(copied_query);
417 :
418 : /* SELECT should never rewrite to more or less than one SELECT query */
419 612 : if (list_length(rewritten) != 1)
420 0 : elog(ERROR, "unexpected rewrite result for %s",
421 : is_create ? "CREATE MATERIALIZED VIEW " : "REFRESH MATERIALIZED VIEW");
422 612 : query = (Query *) linitial(rewritten);
423 :
424 : /* Check for user-requested abort. */
425 612 : CHECK_FOR_INTERRUPTS();
426 :
427 : /* Plan the query which will generate data for the refresh. */
428 612 : plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL, NULL);
429 :
430 : /*
431 : * Use a snapshot with an updated command ID to ensure this query sees
432 : * results of any previously executed queries. (This could only matter if
433 : * the planner executed an allegedly-stable function that changed the
434 : * database contents, but let's do it anyway to be safe.)
435 : */
436 600 : PushCopiedSnapshot(GetActiveSnapshot());
437 600 : UpdateActiveSnapshotCommandId();
438 :
439 : /* Create a QueryDesc, redirecting output to our tuple receiver */
440 600 : queryDesc = CreateQueryDesc(plan, queryString,
441 : GetActiveSnapshot(), InvalidSnapshot,
442 : dest, NULL, NULL, 0);
443 :
444 : /* call ExecutorStart to prepare the plan for execution */
445 600 : ExecutorStart(queryDesc, 0);
446 :
447 : /* run the plan */
448 600 : ExecutorRun(queryDesc, ForwardScanDirection, 0);
449 :
450 570 : processed = queryDesc->estate->es_processed;
451 :
452 : /* and clean up */
453 570 : ExecutorFinish(queryDesc);
454 570 : ExecutorEnd(queryDesc);
455 :
456 570 : FreeQueryDesc(queryDesc);
457 :
458 570 : PopActiveSnapshot();
459 :
460 570 : return processed;
461 : }
462 :
463 : DestReceiver *
464 612 : CreateTransientRelDestReceiver(Oid transientoid)
465 : {
466 612 : DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
467 :
468 612 : self->pub.receiveSlot = transientrel_receive;
469 612 : self->pub.rStartup = transientrel_startup;
470 612 : self->pub.rShutdown = transientrel_shutdown;
471 612 : self->pub.rDestroy = transientrel_destroy;
472 612 : self->pub.mydest = DestTransientRel;
473 612 : self->transientoid = transientoid;
474 :
475 612 : return (DestReceiver *) self;
476 : }
477 :
478 : /*
479 : * transientrel_startup --- executor startup
480 : */
481 : static void
482 600 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
483 : {
484 600 : DR_transientrel *myState = (DR_transientrel *) self;
485 : Relation transientrel;
486 :
487 600 : transientrel = table_open(myState->transientoid, NoLock);
488 :
489 : /*
490 : * Fill private fields of myState for use by later routines
491 : */
492 600 : myState->transientrel = transientrel;
493 600 : myState->output_cid = GetCurrentCommandId(true);
494 600 : myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
495 600 : myState->bistate = GetBulkInsertState();
496 :
497 : /*
498 : * Valid smgr_targblock implies something already wrote to the relation.
499 : * This may be harmless, but this function hasn't planned for it.
500 : */
501 : Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
502 600 : }
503 :
504 : /*
505 : * transientrel_receive --- receive one tuple
506 : */
507 : static bool
508 4038 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
509 : {
510 4038 : DR_transientrel *myState = (DR_transientrel *) self;
511 :
512 : /*
513 : * Note that the input slot might not be of the type of the target
514 : * relation. That's supported by table_tuple_insert(), but slightly less
515 : * efficient than inserting with the right slot - but the alternative
516 : * would be to copy into a slot of the right type, which would not be
517 : * cheap either. This also doesn't allow accessing per-AM data (say a
518 : * tuple's xmin), but since we don't do that here...
519 : */
520 :
521 4038 : table_tuple_insert(myState->transientrel,
522 : slot,
523 : myState->output_cid,
524 : myState->ti_options,
525 4038 : myState->bistate);
526 :
527 : /* We know this is a newly created relation, so there are no indexes */
528 :
529 4038 : return true;
530 : }
531 :
532 : /*
533 : * transientrel_shutdown --- executor end
534 : */
535 : static void
536 570 : transientrel_shutdown(DestReceiver *self)
537 : {
538 570 : DR_transientrel *myState = (DR_transientrel *) self;
539 :
540 570 : FreeBulkInsertState(myState->bistate);
541 :
542 570 : table_finish_bulk_insert(myState->transientrel, myState->ti_options);
543 :
544 : /* close transientrel, but keep lock until commit */
545 570 : table_close(myState->transientrel, NoLock);
546 570 : myState->transientrel = NULL;
547 570 : }
548 :
549 : /*
550 : * transientrel_destroy --- release DestReceiver object
551 : */
552 : static void
553 0 : transientrel_destroy(DestReceiver *self)
554 : {
555 0 : pfree(self);
556 0 : }
557 :
558 : /*
559 : * refresh_by_match_merge
560 : *
561 : * Refresh a materialized view with transactional semantics, while allowing
562 : * concurrent reads.
563 : *
564 : * This is called after a new version of the data has been created in a
565 : * temporary table. It performs a full outer join against the old version of
566 : * the data, producing "diff" results. This join cannot work if there are any
567 : * duplicated rows in either the old or new versions, in the sense that every
568 : * column would compare as equal between the two rows. It does work correctly
569 : * in the face of rows which have at least one NULL value, with all non-NULL
570 : * columns equal. The behavior of NULLs on equality tests and on UNIQUE
571 : * indexes turns out to be quite convenient here; the tests we need to make
572 : * are consistent with default behavior. If there is at least one UNIQUE
573 : * index on the materialized view, we have exactly the guarantee we need.
574 : *
575 : * The temporary table used to hold the diff results contains just the TID of
576 : * the old record (if matched) and the ROW from the new table as a single
577 : * column of complex record type (if matched).
578 : *
579 : * Once we have the diff table, we perform set-based DELETE and INSERT
580 : * operations against the materialized view, and discard both temporary
581 : * tables.
582 : *
583 : * Everything from the generation of the new data to applying the differences
584 : * takes place under cover of an ExclusiveLock, since it seems as though we
585 : * would want to prohibit not only concurrent REFRESH operations, but also
586 : * incremental maintenance. It also doesn't seem reasonable or safe to allow
587 : * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
588 : * this command.
589 : */
590 : static void
591 72 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
592 : int save_sec_context)
593 : {
594 : StringInfoData querybuf;
595 : Relation matviewRel;
596 : Relation tempRel;
597 : char *matviewname;
598 : char *tempname;
599 : char *diffname;
600 : char *temprelname;
601 : char *diffrelname;
602 : char *nsp;
603 : TupleDesc tupdesc;
604 : bool foundUniqueIndex;
605 : List *indexoidlist;
606 : ListCell *indexoidscan;
607 : int16 relnatts;
608 : Oid *opUsedForQual;
609 :
610 72 : initStringInfo(&querybuf);
611 72 : matviewRel = table_open(matviewOid, NoLock);
612 72 : matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
613 72 : RelationGetRelationName(matviewRel));
614 72 : tempRel = table_open(tempOid, NoLock);
615 :
616 : /*
617 : * Build qualified names of the temporary table and the diff table. The
618 : * only difference between them is the "_2" suffix on the diff table name.
619 : */
620 72 : nsp = get_namespace_name(RelationGetNamespace(tempRel));
621 72 : temprelname = RelationGetRelationName(tempRel);
622 72 : diffrelname = psprintf("%s_2", temprelname);
623 :
624 72 : tempname = quote_qualified_identifier(nsp, temprelname);
625 72 : diffname = quote_qualified_identifier(nsp, diffrelname);
626 :
627 72 : relnatts = RelationGetNumberOfAttributes(matviewRel);
628 :
629 : /* Open SPI context. */
630 72 : SPI_connect();
631 :
632 : /* Analyze the temp table with the new contents. */
633 72 : appendStringInfo(&querybuf, "ANALYZE %s", tempname);
634 72 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
635 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
636 :
637 : /*
638 : * We need to ensure that there are not duplicate rows without NULLs in
639 : * the new data set before we can count on the "diff" results. Check for
640 : * that in a way that allows showing the first duplicated row found. Even
641 : * after we pass this test, a unique index on the materialized view may
642 : * find a duplicate key problem.
643 : *
644 : * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
645 : * keep ".*" from being expanded into multiple columns in a SELECT list.
646 : * Compare ruleutils.c's get_variable().
647 : */
648 72 : resetStringInfo(&querybuf);
649 72 : appendStringInfo(&querybuf,
650 : "SELECT newdata.*::%s FROM %s newdata "
651 : "WHERE newdata.* IS NOT NULL AND EXISTS "
652 : "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
653 : "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
654 : "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
655 : "newdata.ctid)",
656 : tempname, tempname, tempname);
657 72 : if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
658 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
659 72 : if (SPI_processed > 0)
660 : {
661 : /*
662 : * Note that this ereport() is returning data to the user. Generally,
663 : * we would want to make sure that the user has been granted access to
664 : * this data. However, REFRESH MAT VIEW is only able to be run by the
665 : * owner of the mat view (or a superuser) and therefore there is no
666 : * need to check for access to data in the mat view.
667 : */
668 6 : ereport(ERROR,
669 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
670 : errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
671 : RelationGetRelationName(matviewRel)),
672 : errdetail("Row: %s",
673 : SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
674 : }
675 :
676 : /*
677 : * Create the temporary "diff" table.
678 : *
679 : * Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
680 : * because you cannot create temp tables in SRO context. For extra
681 : * paranoia, add the composite type column only after switching back to
682 : * SRO context.
683 : */
684 66 : SetUserIdAndSecContext(relowner,
685 : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
686 66 : resetStringInfo(&querybuf);
687 66 : appendStringInfo(&querybuf,
688 : "CREATE TEMP TABLE %s (tid pg_catalog.tid)",
689 : diffname);
690 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
691 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
692 66 : SetUserIdAndSecContext(relowner,
693 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
694 66 : resetStringInfo(&querybuf);
695 66 : appendStringInfo(&querybuf,
696 : "ALTER TABLE %s ADD COLUMN newdata %s",
697 : diffname, tempname);
698 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
699 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
700 :
701 : /* Start building the query for populating the diff table. */
702 66 : resetStringInfo(&querybuf);
703 66 : appendStringInfo(&querybuf,
704 : "INSERT INTO %s "
705 : "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
706 : "FROM %s mv FULL JOIN %s newdata ON (",
707 : diffname, tempname, matviewname, tempname);
708 :
709 : /*
710 : * Get the list of index OIDs for the table from the relcache, and look up
711 : * each one in the pg_index syscache. We will test for equality on all
712 : * columns present in all unique indexes which only reference columns and
713 : * include all rows.
714 : */
715 66 : tupdesc = matviewRel->rd_att;
716 66 : opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
717 66 : foundUniqueIndex = false;
718 :
719 66 : indexoidlist = RelationGetIndexList(matviewRel);
720 :
721 138 : foreach(indexoidscan, indexoidlist)
722 : {
723 72 : Oid indexoid = lfirst_oid(indexoidscan);
724 : Relation indexRel;
725 :
726 72 : indexRel = index_open(indexoid, RowExclusiveLock);
727 72 : if (is_usable_unique_index(indexRel))
728 : {
729 72 : Form_pg_index indexStruct = indexRel->rd_index;
730 72 : int indnkeyatts = indexStruct->indnkeyatts;
731 : oidvector *indclass;
732 : Datum indclassDatum;
733 : int i;
734 :
735 : /* Must get indclass the hard way. */
736 72 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
737 72 : indexRel->rd_indextuple,
738 : Anum_pg_index_indclass);
739 72 : indclass = (oidvector *) DatumGetPointer(indclassDatum);
740 :
741 : /* Add quals for all columns from this index. */
742 160 : for (i = 0; i < indnkeyatts; i++)
743 : {
744 88 : int attnum = indexStruct->indkey.values[i];
745 88 : Oid opclass = indclass->values[i];
746 88 : Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
747 88 : Oid attrtype = attr->atttypid;
748 : HeapTuple cla_ht;
749 : Form_pg_opclass cla_tup;
750 : Oid opfamily;
751 : Oid opcintype;
752 : Oid op;
753 : const char *leftop;
754 : const char *rightop;
755 :
756 : /*
757 : * Identify the equality operator associated with this index
758 : * column. First we need to look up the column's opclass.
759 : */
760 88 : cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
761 88 : if (!HeapTupleIsValid(cla_ht))
762 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
763 88 : cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
764 88 : opfamily = cla_tup->opcfamily;
765 88 : opcintype = cla_tup->opcintype;
766 88 : ReleaseSysCache(cla_ht);
767 :
768 88 : op = get_opfamily_member_for_cmptype(opfamily, opcintype, opcintype, COMPARE_EQ);
769 88 : if (!OidIsValid(op))
770 0 : elog(ERROR, "missing equality operator for (%u,%u) in opfamily %u",
771 : opcintype, opcintype, opfamily);
772 :
773 : /*
774 : * If we find the same column with the same equality semantics
775 : * in more than one index, we only need to emit the equality
776 : * clause once.
777 : *
778 : * Since we only remember the last equality operator, this
779 : * code could be fooled into emitting duplicate clauses given
780 : * multiple indexes with several different opclasses ... but
781 : * that's so unlikely it doesn't seem worth spending extra
782 : * code to avoid.
783 : */
784 88 : if (opUsedForQual[attnum - 1] == op)
785 0 : continue;
786 88 : opUsedForQual[attnum - 1] = op;
787 :
788 : /*
789 : * Actually add the qual, ANDed with any others.
790 : */
791 88 : if (foundUniqueIndex)
792 28 : appendStringInfoString(&querybuf, " AND ");
793 :
794 88 : leftop = quote_qualified_identifier("newdata",
795 88 : NameStr(attr->attname));
796 88 : rightop = quote_qualified_identifier("mv",
797 88 : NameStr(attr->attname));
798 :
799 88 : generate_operator_clause(&querybuf,
800 : leftop, attrtype,
801 : op,
802 : rightop, attrtype);
803 :
804 88 : foundUniqueIndex = true;
805 : }
806 : }
807 :
808 : /* Keep the locks, since we're about to run DML which needs them. */
809 72 : index_close(indexRel, NoLock);
810 : }
811 :
812 66 : list_free(indexoidlist);
813 :
814 : /*
815 : * There must be at least one usable unique index on the matview.
816 : *
817 : * ExecRefreshMatView() checks that after taking the exclusive lock on the
818 : * matview. So at least one unique index is guaranteed to exist here
819 : * because the lock is still being held. (One known exception is if a
820 : * function called as part of refreshing the matview drops the index.
821 : * That's a pretty silly thing to do.)
822 : */
823 66 : if (!foundUniqueIndex)
824 6 : ereport(ERROR,
825 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
826 : errmsg("could not find suitable unique index on materialized view \"%s\"",
827 : RelationGetRelationName(matviewRel)));
828 :
829 60 : appendStringInfoString(&querybuf,
830 : " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
831 : "WHERE newdata.* IS NULL OR mv.* IS NULL "
832 : "ORDER BY tid");
833 :
834 : /* Populate the temporary "diff" table. */
835 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
836 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
837 :
838 : /*
839 : * We have no further use for data from the "full-data" temp table, but we
840 : * must keep it around because its type is referenced from the diff table.
841 : */
842 :
843 : /* Analyze the diff table. */
844 60 : resetStringInfo(&querybuf);
845 60 : appendStringInfo(&querybuf, "ANALYZE %s", diffname);
846 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
847 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
848 :
849 60 : OpenMatViewIncrementalMaintenance();
850 :
851 : /* Deletes must come before inserts; do them first. */
852 60 : resetStringInfo(&querybuf);
853 60 : appendStringInfo(&querybuf,
854 : "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
855 : "(SELECT diff.tid FROM %s diff "
856 : "WHERE diff.tid IS NOT NULL "
857 : "AND diff.newdata IS NULL)",
858 : matviewname, diffname);
859 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
860 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
861 :
862 : /* Inserts go last. */
863 60 : resetStringInfo(&querybuf);
864 60 : appendStringInfo(&querybuf,
865 : "INSERT INTO %s SELECT (diff.newdata).* "
866 : "FROM %s diff WHERE tid IS NULL",
867 : matviewname, diffname);
868 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
869 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
870 :
871 : /* We're done maintaining the materialized view. */
872 60 : CloseMatViewIncrementalMaintenance();
873 60 : table_close(tempRel, NoLock);
874 60 : table_close(matviewRel, NoLock);
875 :
876 : /* Clean up temp tables. */
877 60 : resetStringInfo(&querybuf);
878 60 : appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
879 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
880 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
881 :
882 : /* Close SPI context. */
883 60 : if (SPI_finish() != SPI_OK_FINISH)
884 0 : elog(ERROR, "SPI_finish failed");
885 60 : }
886 :
887 : /*
888 : * Swap the physical files of the target and transient tables, then rebuild
889 : * the target's indexes and throw away the transient table. Security context
890 : * swapping is handled by the called function, so it is not needed here.
891 : */
892 : static void
893 498 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
894 : {
895 498 : finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
896 : RecentXmin, ReadNextMultiXactId(), relpersistence);
897 492 : }
898 :
899 : /*
900 : * Check whether specified index is usable for match merge.
901 : */
902 : static bool
903 156 : is_usable_unique_index(Relation indexRel)
904 : {
905 156 : Form_pg_index indexStruct = indexRel->rd_index;
906 :
907 : /*
908 : * Must be unique, valid, immediate, non-partial, and be defined over
909 : * plain user columns (not expressions).
910 : */
911 156 : if (indexStruct->indisunique &&
912 156 : indexStruct->indimmediate &&
913 312 : indexStruct->indisvalid &&
914 156 : RelationGetIndexPredicate(indexRel) == NIL &&
915 150 : indexStruct->indnatts > 0)
916 : {
917 : /*
918 : * The point of groveling through the index columns individually is to
919 : * reject both index expressions and system columns. Currently,
920 : * matviews couldn't have OID columns so there's no way to create an
921 : * index on a system column; but maybe someday that wouldn't be true,
922 : * so let's be safe.
923 : */
924 150 : int numatts = indexStruct->indnatts;
925 : int i;
926 :
927 326 : for (i = 0; i < numatts; i++)
928 : {
929 182 : int attnum = indexStruct->indkey.values[i];
930 :
931 182 : if (attnum <= 0)
932 6 : return false;
933 : }
934 144 : return true;
935 : }
936 6 : return false;
937 : }
938 :
939 :
940 : /*
941 : * This should be used to test whether the backend is in a context where it is
942 : * OK to allow DML statements to modify materialized views. We only want to
943 : * allow that for internal code driven by the materialized view definition,
944 : * not for arbitrary user-supplied code.
945 : *
946 : * While the function names reflect the fact that their main intended use is
947 : * incremental maintenance of materialized views (in response to changes to
948 : * the data in referenced relations), they are initially used to allow REFRESH
949 : * without blocking concurrent reads.
950 : */
951 : bool
952 120 : MatViewIncrementalMaintenanceIsEnabled(void)
953 : {
954 120 : return matview_maintenance_depth > 0;
955 : }
956 :
957 : static void
958 60 : OpenMatViewIncrementalMaintenance(void)
959 : {
960 60 : matview_maintenance_depth++;
961 60 : }
962 :
963 : static void
964 60 : CloseMatViewIncrementalMaintenance(void)
965 : {
966 60 : matview_maintenance_depth--;
967 : Assert(matview_maintenance_depth >= 0);
968 60 : }
|