Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * matview.c
4 : * materialized view support
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/matview.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/multixact.h"
21 : #include "access/tableam.h"
22 : #include "access/xact.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/pg_am.h"
26 : #include "catalog/pg_opclass.h"
27 : #include "commands/cluster.h"
28 : #include "commands/matview.h"
29 : #include "commands/tablecmds.h"
30 : #include "commands/tablespace.h"
31 : #include "executor/executor.h"
32 : #include "executor/spi.h"
33 : #include "miscadmin.h"
34 : #include "pgstat.h"
35 : #include "rewrite/rewriteHandler.h"
36 : #include "storage/lmgr.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/builtins.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/snapmgr.h"
42 : #include "utils/syscache.h"
43 :
44 :
45 : typedef struct
46 : {
47 : DestReceiver pub; /* publicly-known function pointers */
48 : Oid transientoid; /* OID of new heap into which to store */
49 : /* These fields are filled by transientrel_startup: */
50 : Relation transientrel; /* relation to write to */
51 : CommandId output_cid; /* cmin to insert in output tuples */
52 : int ti_options; /* table_tuple_insert performance options */
53 : BulkInsertState bistate; /* bulk insert state */
54 : } DR_transientrel;
55 :
56 : static int matview_maintenance_depth = 0;
57 :
58 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
59 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
60 : static void transientrel_shutdown(DestReceiver *self);
61 : static void transientrel_destroy(DestReceiver *self);
62 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
63 : const char *queryString);
64 : static char *make_temptable_name_n(char *tempname, int n);
65 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
66 : int save_sec_context);
67 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
68 : static bool is_usable_unique_index(Relation indexRel);
69 : static void OpenMatViewIncrementalMaintenance(void);
70 : static void CloseMatViewIncrementalMaintenance(void);
71 :
72 : /*
73 : * SetMatViewPopulatedState
74 : * Mark a materialized view as populated, or not.
75 : *
76 : * NOTE: caller must be holding an appropriate lock on the relation.
77 : */
78 : void
79 602 : SetMatViewPopulatedState(Relation relation, bool newstate)
80 : {
81 : Relation pgrel;
82 : HeapTuple tuple;
83 :
84 : Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
85 :
86 : /*
87 : * Update relation's pg_class entry. Crucial side-effect: other backends
88 : * (and this one too!) are sent SI message to make them rebuild relcache
89 : * entries.
90 : */
91 602 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
92 602 : tuple = SearchSysCacheCopy1(RELOID,
93 : ObjectIdGetDatum(RelationGetRelid(relation)));
94 602 : if (!HeapTupleIsValid(tuple))
95 0 : elog(ERROR, "cache lookup failed for relation %u",
96 : RelationGetRelid(relation));
97 :
98 602 : ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
99 :
100 602 : CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
101 :
102 602 : heap_freetuple(tuple);
103 602 : table_close(pgrel, RowExclusiveLock);
104 :
105 : /*
106 : * Advance command counter to make the updated pg_class row locally
107 : * visible.
108 : */
109 602 : CommandCounterIncrement();
110 602 : }
111 :
112 : /*
113 : * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
114 : *
115 : * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
116 : * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
117 : * statement associated with the materialized view. The statement node's
118 : * skipData field shows whether the clause was used.
119 : */
120 : ObjectAddress
121 258 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
122 : ParamListInfo params, QueryCompletion *qc)
123 : {
124 : Oid matviewOid;
125 : LOCKMODE lockmode;
126 :
127 : /* Determine strength of lock needed. */
128 258 : lockmode = stmt->concurrent ? ExclusiveLock : AccessExclusiveLock;
129 :
130 : /*
131 : * Get a lock until end of transaction.
132 : */
133 258 : matviewOid = RangeVarGetRelidExtended(stmt->relation,
134 : lockmode, 0,
135 : RangeVarCallbackMaintainsTable,
136 : NULL);
137 :
138 252 : return RefreshMatViewByOid(matviewOid, stmt->skipData, stmt->concurrent,
139 : queryString, params, qc);
140 : }
141 :
142 : /*
143 : * RefreshMatViewByOid -- refresh materialized view by OID
144 : *
145 : * This refreshes the materialized view by creating a new table and swapping
146 : * the relfilenumbers of the new table and the old materialized view, so the OID
147 : * of the original materialized view is preserved. Thus we do not lose GRANT
148 : * nor references to this materialized view.
149 : *
150 : * If skipData is true, this is effectively like a TRUNCATE; otherwise it is
151 : * like a TRUNCATE followed by an INSERT using the SELECT statement associated
152 : * with the materialized view.
153 : *
154 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
155 : * the new heap, it's better to create the indexes afterwards than to fill them
156 : * incrementally while we load.
157 : *
158 : * The matview's "populated" state is changed based on whether the contents
159 : * reflect the result set of the materialized view's query.
160 : */
161 : ObjectAddress
162 608 : RefreshMatViewByOid(Oid matviewOid, bool skipData, bool concurrent,
163 : const char *queryString, ParamListInfo params,
164 : QueryCompletion *qc)
165 : {
166 : Relation matviewRel;
167 : RewriteRule *rule;
168 : List *actions;
169 : Query *dataQuery;
170 : Oid tableSpace;
171 : Oid relowner;
172 : Oid OIDNewHeap;
173 : DestReceiver *dest;
174 608 : uint64 processed = 0;
175 : char relpersistence;
176 : Oid save_userid;
177 : int save_sec_context;
178 : int save_nestlevel;
179 : ObjectAddress address;
180 :
181 608 : matviewRel = table_open(matviewOid, NoLock);
182 608 : relowner = matviewRel->rd_rel->relowner;
183 :
184 : /*
185 : * Switch to the owner's userid, so that any functions are run as that
186 : * user. Also lock down security-restricted operations and arrange to
187 : * make GUC variable changes local to this command.
188 : */
189 608 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
190 608 : SetUserIdAndSecContext(relowner,
191 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
192 608 : save_nestlevel = NewGUCNestLevel();
193 608 : RestrictSearchPath();
194 :
195 : /* Make sure it is a materialized view. */
196 608 : if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
197 0 : ereport(ERROR,
198 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
199 : errmsg("\"%s\" is not a materialized view",
200 : RelationGetRelationName(matviewRel))));
201 :
202 : /* Check that CONCURRENTLY is not specified if not populated. */
203 608 : if (concurrent && !RelationIsPopulated(matviewRel))
204 0 : ereport(ERROR,
205 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
206 : errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
207 :
208 : /* Check that conflicting options have not been specified. */
209 608 : if (concurrent && skipData)
210 6 : ereport(ERROR,
211 : (errcode(ERRCODE_SYNTAX_ERROR),
212 : errmsg("%s and %s options cannot be used together",
213 : "CONCURRENTLY", "WITH NO DATA")));
214 :
215 : /*
216 : * Check that everything is correct for a refresh. Problems at this point
217 : * are internal errors, so elog is sufficient.
218 : */
219 602 : if (matviewRel->rd_rel->relhasrules == false ||
220 602 : matviewRel->rd_rules->numLocks < 1)
221 0 : elog(ERROR,
222 : "materialized view \"%s\" is missing rewrite information",
223 : RelationGetRelationName(matviewRel));
224 :
225 602 : if (matviewRel->rd_rules->numLocks > 1)
226 0 : elog(ERROR,
227 : "materialized view \"%s\" has too many rules",
228 : RelationGetRelationName(matviewRel));
229 :
230 602 : rule = matviewRel->rd_rules->rules[0];
231 602 : if (rule->event != CMD_SELECT || !(rule->isInstead))
232 0 : elog(ERROR,
233 : "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
234 : RelationGetRelationName(matviewRel));
235 :
236 602 : actions = rule->actions;
237 602 : if (list_length(actions) != 1)
238 0 : elog(ERROR,
239 : "the rule for materialized view \"%s\" is not a single action",
240 : RelationGetRelationName(matviewRel));
241 :
242 : /*
243 : * Check that there is a unique index with no WHERE clause on one or more
244 : * columns of the materialized view if CONCURRENTLY is specified.
245 : */
246 602 : if (concurrent)
247 : {
248 78 : List *indexoidlist = RelationGetIndexList(matviewRel);
249 : ListCell *indexoidscan;
250 78 : bool hasUniqueIndex = false;
251 :
252 90 : foreach(indexoidscan, indexoidlist)
253 : {
254 84 : Oid indexoid = lfirst_oid(indexoidscan);
255 : Relation indexRel;
256 :
257 84 : indexRel = index_open(indexoid, AccessShareLock);
258 84 : hasUniqueIndex = is_usable_unique_index(indexRel);
259 84 : index_close(indexRel, AccessShareLock);
260 84 : if (hasUniqueIndex)
261 72 : break;
262 : }
263 :
264 78 : list_free(indexoidlist);
265 :
266 78 : if (!hasUniqueIndex)
267 6 : ereport(ERROR,
268 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
269 : errmsg("cannot refresh materialized view \"%s\" concurrently",
270 : quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
271 : RelationGetRelationName(matviewRel))),
272 : errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
273 : }
274 :
275 : /*
276 : * The stored query was rewritten at the time of the MV definition, but
277 : * has not been scribbled on by the planner.
278 : */
279 596 : dataQuery = linitial_node(Query, actions);
280 :
281 : /*
282 : * Check for active uses of the relation in the current transaction, such
283 : * as open scans.
284 : *
285 : * NB: We count on this to protect us against problems with refreshing the
286 : * data using TABLE_INSERT_FROZEN.
287 : */
288 596 : CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
289 :
290 : /*
291 : * Tentatively mark the matview as populated or not (this will roll back
292 : * if we fail later).
293 : */
294 596 : SetMatViewPopulatedState(matviewRel, !skipData);
295 :
296 : /* Concurrent refresh builds new data in temp tablespace, and does diff. */
297 596 : if (concurrent)
298 : {
299 72 : tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
300 72 : relpersistence = RELPERSISTENCE_TEMP;
301 : }
302 : else
303 : {
304 524 : tableSpace = matviewRel->rd_rel->reltablespace;
305 524 : relpersistence = matviewRel->rd_rel->relpersistence;
306 : }
307 :
308 : /*
309 : * Create the transient table that will receive the regenerated data. Lock
310 : * it against access by any other process until commit (by which time it
311 : * will be gone).
312 : */
313 1192 : OIDNewHeap = make_new_heap(matviewOid, tableSpace,
314 596 : matviewRel->rd_rel->relam,
315 : relpersistence, ExclusiveLock);
316 596 : LockRelationOid(OIDNewHeap, AccessExclusiveLock);
317 596 : dest = CreateTransientRelDestReceiver(OIDNewHeap);
318 :
319 : /* Generate the data, if wanted. */
320 596 : if (!skipData)
321 596 : processed = refresh_matview_datafill(dest, dataQuery, queryString);
322 :
323 : /* Make the matview match the newly generated data. */
324 554 : if (concurrent)
325 : {
326 72 : int old_depth = matview_maintenance_depth;
327 :
328 72 : PG_TRY();
329 : {
330 72 : refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
331 : save_sec_context);
332 : }
333 12 : PG_CATCH();
334 : {
335 12 : matview_maintenance_depth = old_depth;
336 12 : PG_RE_THROW();
337 : }
338 60 : PG_END_TRY();
339 : Assert(matview_maintenance_depth == old_depth);
340 : }
341 : else
342 : {
343 482 : refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
344 :
345 : /*
346 : * Inform cumulative stats system about our activity: basically, we
347 : * truncated the matview and inserted some new data. (The concurrent
348 : * code path above doesn't need to worry about this because the
349 : * inserts and deletes it issues get counted by lower-level code.)
350 : */
351 476 : pgstat_count_truncate(matviewRel);
352 476 : if (!skipData)
353 476 : pgstat_count_heap_insert(matviewRel, processed);
354 : }
355 :
356 536 : table_close(matviewRel, NoLock);
357 :
358 : /* Roll back any GUC changes */
359 536 : AtEOXact_GUC(false, save_nestlevel);
360 :
361 : /* Restore userid and security context */
362 536 : SetUserIdAndSecContext(save_userid, save_sec_context);
363 :
364 536 : ObjectAddressSet(address, RelationRelationId, matviewOid);
365 :
366 : /*
367 : * Save the rowcount so that pg_stat_statements can track the total number
368 : * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
369 : * still don't display the rowcount in the command completion tag output,
370 : * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
371 : * command tag is left false in cmdtaglist.h. Otherwise, the change of
372 : * completion tag output might break applications using it.
373 : */
374 536 : if (qc)
375 524 : SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
376 :
377 536 : return address;
378 : }
379 :
380 : /*
381 : * refresh_matview_datafill
382 : *
383 : * Execute the given query, sending result rows to "dest" (which will
384 : * insert them into the target matview).
385 : *
386 : * Returns number of rows inserted.
387 : */
388 : static uint64
389 596 : refresh_matview_datafill(DestReceiver *dest, Query *query,
390 : const char *queryString)
391 : {
392 : List *rewritten;
393 : PlannedStmt *plan;
394 : QueryDesc *queryDesc;
395 : Query *copied_query;
396 : uint64 processed;
397 :
398 : /* Lock and rewrite, using a copy to preserve the original query. */
399 596 : copied_query = copyObject(query);
400 596 : AcquireRewriteLocks(copied_query, true, false);
401 596 : rewritten = QueryRewrite(copied_query);
402 :
403 : /* SELECT should never rewrite to more or less than one SELECT query */
404 596 : if (list_length(rewritten) != 1)
405 0 : elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
406 596 : query = (Query *) linitial(rewritten);
407 :
408 : /* Check for user-requested abort. */
409 596 : CHECK_FOR_INTERRUPTS();
410 :
411 : /* Plan the query which will generate data for the refresh. */
412 596 : plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
413 :
414 : /*
415 : * Use a snapshot with an updated command ID to ensure this query sees
416 : * results of any previously executed queries. (This could only matter if
417 : * the planner executed an allegedly-stable function that changed the
418 : * database contents, but let's do it anyway to be safe.)
419 : */
420 584 : PushCopiedSnapshot(GetActiveSnapshot());
421 584 : UpdateActiveSnapshotCommandId();
422 :
423 : /* Create a QueryDesc, redirecting output to our tuple receiver */
424 584 : queryDesc = CreateQueryDesc(plan, queryString,
425 : GetActiveSnapshot(), InvalidSnapshot,
426 : dest, NULL, NULL, 0);
427 :
428 : /* call ExecutorStart to prepare the plan for execution */
429 584 : ExecutorStart(queryDesc, 0);
430 :
431 : /* run the plan */
432 584 : ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
433 :
434 554 : processed = queryDesc->estate->es_processed;
435 :
436 : /* and clean up */
437 554 : ExecutorFinish(queryDesc);
438 554 : ExecutorEnd(queryDesc);
439 :
440 554 : FreeQueryDesc(queryDesc);
441 :
442 554 : PopActiveSnapshot();
443 :
444 554 : return processed;
445 : }
446 :
447 : DestReceiver *
448 596 : CreateTransientRelDestReceiver(Oid transientoid)
449 : {
450 596 : DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
451 :
452 596 : self->pub.receiveSlot = transientrel_receive;
453 596 : self->pub.rStartup = transientrel_startup;
454 596 : self->pub.rShutdown = transientrel_shutdown;
455 596 : self->pub.rDestroy = transientrel_destroy;
456 596 : self->pub.mydest = DestTransientRel;
457 596 : self->transientoid = transientoid;
458 :
459 596 : return (DestReceiver *) self;
460 : }
461 :
462 : /*
463 : * transientrel_startup --- executor startup
464 : */
465 : static void
466 584 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
467 : {
468 584 : DR_transientrel *myState = (DR_transientrel *) self;
469 : Relation transientrel;
470 :
471 584 : transientrel = table_open(myState->transientoid, NoLock);
472 :
473 : /*
474 : * Fill private fields of myState for use by later routines
475 : */
476 584 : myState->transientrel = transientrel;
477 584 : myState->output_cid = GetCurrentCommandId(true);
478 584 : myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
479 584 : myState->bistate = GetBulkInsertState();
480 :
481 : /*
482 : * Valid smgr_targblock implies something already wrote to the relation.
483 : * This may be harmless, but this function hasn't planned for it.
484 : */
485 : Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
486 584 : }
487 :
488 : /*
489 : * transientrel_receive --- receive one tuple
490 : */
491 : static bool
492 3814 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
493 : {
494 3814 : DR_transientrel *myState = (DR_transientrel *) self;
495 :
496 : /*
497 : * Note that the input slot might not be of the type of the target
498 : * relation. That's supported by table_tuple_insert(), but slightly less
499 : * efficient than inserting with the right slot - but the alternative
500 : * would be to copy into a slot of the right type, which would not be
501 : * cheap either. This also doesn't allow accessing per-AM data (say a
502 : * tuple's xmin), but since we don't do that here...
503 : */
504 :
505 3814 : table_tuple_insert(myState->transientrel,
506 : slot,
507 : myState->output_cid,
508 : myState->ti_options,
509 : myState->bistate);
510 :
511 : /* We know this is a newly created relation, so there are no indexes */
512 :
513 3814 : return true;
514 : }
515 :
516 : /*
517 : * transientrel_shutdown --- executor end
518 : */
519 : static void
520 554 : transientrel_shutdown(DestReceiver *self)
521 : {
522 554 : DR_transientrel *myState = (DR_transientrel *) self;
523 :
524 554 : FreeBulkInsertState(myState->bistate);
525 :
526 554 : table_finish_bulk_insert(myState->transientrel, myState->ti_options);
527 :
528 : /* close transientrel, but keep lock until commit */
529 554 : table_close(myState->transientrel, NoLock);
530 554 : myState->transientrel = NULL;
531 554 : }
532 :
533 : /*
534 : * transientrel_destroy --- release DestReceiver object
535 : */
536 : static void
537 0 : transientrel_destroy(DestReceiver *self)
538 : {
539 0 : pfree(self);
540 0 : }
541 :
542 :
543 : /*
544 : * Given a qualified temporary table name, append an underscore followed by
545 : * the given integer, to make a new table name based on the old one.
546 : * The result is a palloc'd string.
547 : *
548 : * As coded, this would fail to make a valid SQL name if the given name were,
549 : * say, "FOO"."BAR". Currently, the table name portion of the input will
550 : * never be double-quoted because it's of the form "pg_temp_NNN", cf
551 : * make_new_heap(). But we might have to work harder someday.
552 : */
553 : static char *
554 72 : make_temptable_name_n(char *tempname, int n)
555 : {
556 : StringInfoData namebuf;
557 :
558 72 : initStringInfo(&namebuf);
559 72 : appendStringInfoString(&namebuf, tempname);
560 72 : appendStringInfo(&namebuf, "_%d", n);
561 72 : return namebuf.data;
562 : }
563 :
564 : /*
565 : * refresh_by_match_merge
566 : *
567 : * Refresh a materialized view with transactional semantics, while allowing
568 : * concurrent reads.
569 : *
570 : * This is called after a new version of the data has been created in a
571 : * temporary table. It performs a full outer join against the old version of
572 : * the data, producing "diff" results. This join cannot work if there are any
573 : * duplicated rows in either the old or new versions, in the sense that every
574 : * column would compare as equal between the two rows. It does work correctly
575 : * in the face of rows which have at least one NULL value, with all non-NULL
576 : * columns equal. The behavior of NULLs on equality tests and on UNIQUE
577 : * indexes turns out to be quite convenient here; the tests we need to make
578 : * are consistent with default behavior. If there is at least one UNIQUE
579 : * index on the materialized view, we have exactly the guarantee we need.
580 : *
581 : * The temporary table used to hold the diff results contains just the TID of
582 : * the old record (if matched) and the ROW from the new table as a single
583 : * column of complex record type (if matched).
584 : *
585 : * Once we have the diff table, we perform set-based DELETE and INSERT
586 : * operations against the materialized view, and discard both temporary
587 : * tables.
588 : *
589 : * Everything from the generation of the new data to applying the differences
590 : * takes place under cover of an ExclusiveLock, since it seems as though we
591 : * would want to prohibit not only concurrent REFRESH operations, but also
592 : * incremental maintenance. It also doesn't seem reasonable or safe to allow
593 : * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
594 : * this command.
595 : */
596 : static void
597 72 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
598 : int save_sec_context)
599 : {
600 : StringInfoData querybuf;
601 : Relation matviewRel;
602 : Relation tempRel;
603 : char *matviewname;
604 : char *tempname;
605 : char *diffname;
606 : TupleDesc tupdesc;
607 : bool foundUniqueIndex;
608 : List *indexoidlist;
609 : ListCell *indexoidscan;
610 : int16 relnatts;
611 : Oid *opUsedForQual;
612 :
613 72 : initStringInfo(&querybuf);
614 72 : matviewRel = table_open(matviewOid, NoLock);
615 72 : matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
616 72 : RelationGetRelationName(matviewRel));
617 72 : tempRel = table_open(tempOid, NoLock);
618 72 : tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
619 72 : RelationGetRelationName(tempRel));
620 72 : diffname = make_temptable_name_n(tempname, 2);
621 :
622 72 : relnatts = RelationGetNumberOfAttributes(matviewRel);
623 :
624 : /* Open SPI context. */
625 72 : if (SPI_connect() != SPI_OK_CONNECT)
626 0 : elog(ERROR, "SPI_connect failed");
627 :
628 : /* Analyze the temp table with the new contents. */
629 72 : appendStringInfo(&querybuf, "ANALYZE %s", tempname);
630 72 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
631 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
632 :
633 : /*
634 : * We need to ensure that there are not duplicate rows without NULLs in
635 : * the new data set before we can count on the "diff" results. Check for
636 : * that in a way that allows showing the first duplicated row found. Even
637 : * after we pass this test, a unique index on the materialized view may
638 : * find a duplicate key problem.
639 : *
640 : * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
641 : * keep ".*" from being expanded into multiple columns in a SELECT list.
642 : * Compare ruleutils.c's get_variable().
643 : */
644 72 : resetStringInfo(&querybuf);
645 72 : appendStringInfo(&querybuf,
646 : "SELECT newdata.*::%s FROM %s newdata "
647 : "WHERE newdata.* IS NOT NULL AND EXISTS "
648 : "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
649 : "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
650 : "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
651 : "newdata.ctid)",
652 : tempname, tempname, tempname);
653 72 : if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
654 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
655 72 : if (SPI_processed > 0)
656 : {
657 : /*
658 : * Note that this ereport() is returning data to the user. Generally,
659 : * we would want to make sure that the user has been granted access to
660 : * this data. However, REFRESH MAT VIEW is only able to be run by the
661 : * owner of the mat view (or a superuser) and therefore there is no
662 : * need to check for access to data in the mat view.
663 : */
664 6 : ereport(ERROR,
665 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
666 : errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
667 : RelationGetRelationName(matviewRel)),
668 : errdetail("Row: %s",
669 : SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
670 : }
671 :
672 : /*
673 : * Create the temporary "diff" table.
674 : *
675 : * Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
676 : * because you cannot create temp tables in SRO context. For extra
677 : * paranoia, add the composite type column only after switching back to
678 : * SRO context.
679 : */
680 66 : SetUserIdAndSecContext(relowner,
681 : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
682 66 : resetStringInfo(&querybuf);
683 66 : appendStringInfo(&querybuf,
684 : "CREATE TEMP TABLE %s (tid pg_catalog.tid)",
685 : diffname);
686 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
687 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
688 66 : SetUserIdAndSecContext(relowner,
689 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
690 66 : resetStringInfo(&querybuf);
691 66 : appendStringInfo(&querybuf,
692 : "ALTER TABLE %s ADD COLUMN newdata %s",
693 : diffname, tempname);
694 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
695 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
696 :
697 : /* Start building the query for populating the diff table. */
698 66 : resetStringInfo(&querybuf);
699 66 : appendStringInfo(&querybuf,
700 : "INSERT INTO %s "
701 : "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
702 : "FROM %s mv FULL JOIN %s newdata ON (",
703 : diffname, tempname, matviewname, tempname);
704 :
705 : /*
706 : * Get the list of index OIDs for the table from the relcache, and look up
707 : * each one in the pg_index syscache. We will test for equality on all
708 : * columns present in all unique indexes which only reference columns and
709 : * include all rows.
710 : */
711 66 : tupdesc = matviewRel->rd_att;
712 66 : opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
713 66 : foundUniqueIndex = false;
714 :
715 66 : indexoidlist = RelationGetIndexList(matviewRel);
716 :
717 138 : foreach(indexoidscan, indexoidlist)
718 : {
719 72 : Oid indexoid = lfirst_oid(indexoidscan);
720 : Relation indexRel;
721 :
722 72 : indexRel = index_open(indexoid, RowExclusiveLock);
723 72 : if (is_usable_unique_index(indexRel))
724 : {
725 72 : Form_pg_index indexStruct = indexRel->rd_index;
726 72 : int indnkeyatts = indexStruct->indnkeyatts;
727 : oidvector *indclass;
728 : Datum indclassDatum;
729 : int i;
730 :
731 : /* Must get indclass the hard way. */
732 72 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
733 72 : indexRel->rd_indextuple,
734 : Anum_pg_index_indclass);
735 72 : indclass = (oidvector *) DatumGetPointer(indclassDatum);
736 :
737 : /* Add quals for all columns from this index. */
738 160 : for (i = 0; i < indnkeyatts; i++)
739 : {
740 88 : int attnum = indexStruct->indkey.values[i];
741 88 : Oid opclass = indclass->values[i];
742 88 : Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
743 88 : Oid attrtype = attr->atttypid;
744 : HeapTuple cla_ht;
745 : Form_pg_opclass cla_tup;
746 : Oid opfamily;
747 : Oid opcintype;
748 : Oid op;
749 : const char *leftop;
750 : const char *rightop;
751 :
752 : /*
753 : * Identify the equality operator associated with this index
754 : * column. First we need to look up the column's opclass.
755 : */
756 88 : cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
757 88 : if (!HeapTupleIsValid(cla_ht))
758 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
759 88 : cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
760 : Assert(cla_tup->opcmethod == BTREE_AM_OID);
761 88 : opfamily = cla_tup->opcfamily;
762 88 : opcintype = cla_tup->opcintype;
763 88 : ReleaseSysCache(cla_ht);
764 :
765 88 : op = get_opfamily_member(opfamily, opcintype, opcintype,
766 : BTEqualStrategyNumber);
767 88 : if (!OidIsValid(op))
768 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
769 : BTEqualStrategyNumber, opcintype, opcintype, opfamily);
770 :
771 : /*
772 : * If we find the same column with the same equality semantics
773 : * in more than one index, we only need to emit the equality
774 : * clause once.
775 : *
776 : * Since we only remember the last equality operator, this
777 : * code could be fooled into emitting duplicate clauses given
778 : * multiple indexes with several different opclasses ... but
779 : * that's so unlikely it doesn't seem worth spending extra
780 : * code to avoid.
781 : */
782 88 : if (opUsedForQual[attnum - 1] == op)
783 0 : continue;
784 88 : opUsedForQual[attnum - 1] = op;
785 :
786 : /*
787 : * Actually add the qual, ANDed with any others.
788 : */
789 88 : if (foundUniqueIndex)
790 28 : appendStringInfoString(&querybuf, " AND ");
791 :
792 88 : leftop = quote_qualified_identifier("newdata",
793 88 : NameStr(attr->attname));
794 88 : rightop = quote_qualified_identifier("mv",
795 88 : NameStr(attr->attname));
796 :
797 88 : generate_operator_clause(&querybuf,
798 : leftop, attrtype,
799 : op,
800 : rightop, attrtype);
801 :
802 88 : foundUniqueIndex = true;
803 : }
804 : }
805 :
806 : /* Keep the locks, since we're about to run DML which needs them. */
807 72 : index_close(indexRel, NoLock);
808 : }
809 :
810 66 : list_free(indexoidlist);
811 :
812 : /*
813 : * There must be at least one usable unique index on the matview.
814 : *
815 : * ExecRefreshMatView() checks that after taking the exclusive lock on the
816 : * matview. So at least one unique index is guaranteed to exist here
817 : * because the lock is still being held. (One known exception is if a
818 : * function called as part of refreshing the matview drops the index.
819 : * That's a pretty silly thing to do.)
820 : */
821 66 : if (!foundUniqueIndex)
822 6 : ereport(ERROR,
823 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
824 : errmsg("could not find suitable unique index on materialized view"));
825 :
826 60 : appendStringInfoString(&querybuf,
827 : " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
828 : "WHERE newdata.* IS NULL OR mv.* IS NULL "
829 : "ORDER BY tid");
830 :
831 : /* Populate the temporary "diff" table. */
832 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
833 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
834 :
835 : /*
836 : * We have no further use for data from the "full-data" temp table, but we
837 : * must keep it around because its type is referenced from the diff table.
838 : */
839 :
840 : /* Analyze the diff table. */
841 60 : resetStringInfo(&querybuf);
842 60 : appendStringInfo(&querybuf, "ANALYZE %s", diffname);
843 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
844 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
845 :
846 60 : OpenMatViewIncrementalMaintenance();
847 :
848 : /* Deletes must come before inserts; do them first. */
849 60 : resetStringInfo(&querybuf);
850 60 : appendStringInfo(&querybuf,
851 : "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
852 : "(SELECT diff.tid FROM %s diff "
853 : "WHERE diff.tid IS NOT NULL "
854 : "AND diff.newdata IS NULL)",
855 : matviewname, diffname);
856 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
857 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
858 :
859 : /* Inserts go last. */
860 60 : resetStringInfo(&querybuf);
861 60 : appendStringInfo(&querybuf,
862 : "INSERT INTO %s SELECT (diff.newdata).* "
863 : "FROM %s diff WHERE tid IS NULL",
864 : matviewname, diffname);
865 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
866 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
867 :
868 : /* We're done maintaining the materialized view. */
869 60 : CloseMatViewIncrementalMaintenance();
870 60 : table_close(tempRel, NoLock);
871 60 : table_close(matviewRel, NoLock);
872 :
873 : /* Clean up temp tables. */
874 60 : resetStringInfo(&querybuf);
875 60 : appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
876 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
877 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
878 :
879 : /* Close SPI context. */
880 60 : if (SPI_finish() != SPI_OK_FINISH)
881 0 : elog(ERROR, "SPI_finish failed");
882 60 : }
883 :
884 : /*
885 : * Swap the physical files of the target and transient tables, then rebuild
886 : * the target's indexes and throw away the transient table. Security context
887 : * swapping is handled by the called function, so it is not needed here.
888 : */
889 : static void
890 482 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
891 : {
892 482 : finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
893 : RecentXmin, ReadNextMultiXactId(), relpersistence);
894 476 : }
895 :
896 : /*
897 : * Check whether specified index is usable for match merge.
898 : */
899 : static bool
900 156 : is_usable_unique_index(Relation indexRel)
901 : {
902 156 : Form_pg_index indexStruct = indexRel->rd_index;
903 :
904 : /*
905 : * Must be unique, valid, immediate, non-partial, and be defined over
906 : * plain user columns (not expressions). We also require it to be a
907 : * btree. Even if we had any other unique index kinds, we'd not know how
908 : * to identify the corresponding equality operator, nor could we be sure
909 : * that the planner could implement the required FULL JOIN with non-btree
910 : * operators.
911 : */
912 156 : if (indexStruct->indisunique &&
913 156 : indexStruct->indimmediate &&
914 156 : indexRel->rd_rel->relam == BTREE_AM_OID &&
915 312 : indexStruct->indisvalid &&
916 156 : RelationGetIndexPredicate(indexRel) == NIL &&
917 150 : indexStruct->indnatts > 0)
918 : {
919 : /*
920 : * The point of groveling through the index columns individually is to
921 : * reject both index expressions and system columns. Currently,
922 : * matviews couldn't have OID columns so there's no way to create an
923 : * index on a system column; but maybe someday that wouldn't be true,
924 : * so let's be safe.
925 : */
926 150 : int numatts = indexStruct->indnatts;
927 : int i;
928 :
929 326 : for (i = 0; i < numatts; i++)
930 : {
931 182 : int attnum = indexStruct->indkey.values[i];
932 :
933 182 : if (attnum <= 0)
934 6 : return false;
935 : }
936 144 : return true;
937 : }
938 6 : return false;
939 : }
940 :
941 :
942 : /*
943 : * This should be used to test whether the backend is in a context where it is
944 : * OK to allow DML statements to modify materialized views. We only want to
945 : * allow that for internal code driven by the materialized view definition,
946 : * not for arbitrary user-supplied code.
947 : *
948 : * While the function names reflect the fact that their main intended use is
949 : * incremental maintenance of materialized views (in response to changes to
950 : * the data in referenced relations), they are initially used to allow REFRESH
951 : * without blocking concurrent reads.
952 : */
953 : bool
954 120 : MatViewIncrementalMaintenanceIsEnabled(void)
955 : {
956 120 : return matview_maintenance_depth > 0;
957 : }
958 :
959 : static void
960 60 : OpenMatViewIncrementalMaintenance(void)
961 : {
962 60 : matview_maintenance_depth++;
963 60 : }
964 :
965 : static void
966 60 : CloseMatViewIncrementalMaintenance(void)
967 : {
968 60 : matview_maintenance_depth--;
969 : Assert(matview_maintenance_depth >= 0);
970 60 : }
|