Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * matview.c
4 : * materialized view support
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/matview.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/multixact.h"
21 : #include "access/tableam.h"
22 : #include "access/xact.h"
23 : #include "catalog/indexing.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/pg_am.h"
26 : #include "catalog/pg_opclass.h"
27 : #include "commands/cluster.h"
28 : #include "commands/matview.h"
29 : #include "commands/tablecmds.h"
30 : #include "commands/tablespace.h"
31 : #include "executor/executor.h"
32 : #include "executor/spi.h"
33 : #include "miscadmin.h"
34 : #include "pgstat.h"
35 : #include "rewrite/rewriteHandler.h"
36 : #include "storage/lmgr.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/builtins.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/snapmgr.h"
42 : #include "utils/syscache.h"
43 :
44 :
45 : typedef struct
46 : {
47 : DestReceiver pub; /* publicly-known function pointers */
48 : Oid transientoid; /* OID of new heap into which to store */
49 : /* These fields are filled by transientrel_startup: */
50 : Relation transientrel; /* relation to write to */
51 : CommandId output_cid; /* cmin to insert in output tuples */
52 : int ti_options; /* table_tuple_insert performance options */
53 : BulkInsertState bistate; /* bulk insert state */
54 : } DR_transientrel;
55 :
56 : static int matview_maintenance_depth = 0;
57 :
58 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
59 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
60 : static void transientrel_shutdown(DestReceiver *self);
61 : static void transientrel_destroy(DestReceiver *self);
62 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
63 : const char *queryString);
64 : static char *make_temptable_name_n(char *tempname, int n);
65 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
66 : int save_sec_context);
67 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
68 : static bool is_usable_unique_index(Relation indexRel);
69 : static void OpenMatViewIncrementalMaintenance(void);
70 : static void CloseMatViewIncrementalMaintenance(void);
71 :
72 : /*
73 : * SetMatViewPopulatedState
74 : * Mark a materialized view as populated, or not.
75 : *
76 : * NOTE: caller must be holding an appropriate lock on the relation.
77 : */
78 : void
79 596 : SetMatViewPopulatedState(Relation relation, bool newstate)
80 : {
81 : Relation pgrel;
82 : HeapTuple tuple;
83 :
84 : Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
85 :
86 : /*
87 : * Update relation's pg_class entry. Crucial side-effect: other backends
88 : * (and this one too!) are sent SI message to make them rebuild relcache
89 : * entries.
90 : */
91 596 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
92 596 : tuple = SearchSysCacheCopy1(RELOID,
93 : ObjectIdGetDatum(RelationGetRelid(relation)));
94 596 : if (!HeapTupleIsValid(tuple))
95 0 : elog(ERROR, "cache lookup failed for relation %u",
96 : RelationGetRelid(relation));
97 :
98 596 : ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
99 :
100 596 : CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
101 :
102 596 : heap_freetuple(tuple);
103 596 : table_close(pgrel, RowExclusiveLock);
104 :
105 : /*
106 : * Advance command counter to make the updated pg_class row locally
107 : * visible.
108 : */
109 596 : CommandCounterIncrement();
110 596 : }
111 :
112 : /*
113 : * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
114 : *
115 : * This refreshes the materialized view by creating a new table and swapping
116 : * the relfilenumbers of the new table and the old materialized view, so the OID
117 : * of the original materialized view is preserved. Thus we do not lose GRANT
118 : * nor references to this materialized view.
119 : *
120 : * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
121 : * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
122 : * statement associated with the materialized view. The statement node's
123 : * skipData field shows whether the clause was used.
124 : *
125 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
126 : * the new heap, it's better to create the indexes afterwards than to fill them
127 : * incrementally while we load.
128 : *
129 : * The matview's "populated" state is changed based on whether the contents
130 : * reflect the result set of the materialized view's query.
131 : */
132 : ObjectAddress
133 258 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
134 : ParamListInfo params, QueryCompletion *qc)
135 : {
136 : Oid matviewOid;
137 : Relation matviewRel;
138 : RewriteRule *rule;
139 : List *actions;
140 : Query *dataQuery;
141 : Oid tableSpace;
142 : Oid relowner;
143 : Oid OIDNewHeap;
144 : DestReceiver *dest;
145 258 : uint64 processed = 0;
146 : bool concurrent;
147 : LOCKMODE lockmode;
148 : char relpersistence;
149 : Oid save_userid;
150 : int save_sec_context;
151 : int save_nestlevel;
152 : ObjectAddress address;
153 :
154 : /* Determine strength of lock needed. */
155 258 : concurrent = stmt->concurrent;
156 258 : lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
157 :
158 : /*
159 : * Get a lock until end of transaction.
160 : */
161 258 : matviewOid = RangeVarGetRelidExtended(stmt->relation,
162 : lockmode, 0,
163 : RangeVarCallbackMaintainsTable,
164 : NULL);
165 252 : matviewRel = table_open(matviewOid, NoLock);
166 252 : relowner = matviewRel->rd_rel->relowner;
167 :
168 : /*
169 : * Switch to the owner's userid, so that any functions are run as that
170 : * user. Also lock down security-restricted operations and arrange to
171 : * make GUC variable changes local to this command.
172 : */
173 252 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
174 252 : SetUserIdAndSecContext(relowner,
175 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
176 252 : save_nestlevel = NewGUCNestLevel();
177 252 : RestrictSearchPath();
178 :
179 : /* Make sure it is a materialized view. */
180 252 : if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
181 0 : ereport(ERROR,
182 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
183 : errmsg("\"%s\" is not a materialized view",
184 : RelationGetRelationName(matviewRel))));
185 :
186 : /* Check that CONCURRENTLY is not specified if not populated. */
187 252 : if (concurrent && !RelationIsPopulated(matviewRel))
188 0 : ereport(ERROR,
189 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
190 : errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
191 :
192 : /* Check that conflicting options have not been specified. */
193 252 : if (concurrent && stmt->skipData)
194 6 : ereport(ERROR,
195 : (errcode(ERRCODE_SYNTAX_ERROR),
196 : errmsg("%s and %s options cannot be used together",
197 : "CONCURRENTLY", "WITH NO DATA")));
198 :
199 : /*
200 : * Check that everything is correct for a refresh. Problems at this point
201 : * are internal errors, so elog is sufficient.
202 : */
203 246 : if (matviewRel->rd_rel->relhasrules == false ||
204 246 : matviewRel->rd_rules->numLocks < 1)
205 0 : elog(ERROR,
206 : "materialized view \"%s\" is missing rewrite information",
207 : RelationGetRelationName(matviewRel));
208 :
209 246 : if (matviewRel->rd_rules->numLocks > 1)
210 0 : elog(ERROR,
211 : "materialized view \"%s\" has too many rules",
212 : RelationGetRelationName(matviewRel));
213 :
214 246 : rule = matviewRel->rd_rules->rules[0];
215 246 : if (rule->event != CMD_SELECT || !(rule->isInstead))
216 0 : elog(ERROR,
217 : "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
218 : RelationGetRelationName(matviewRel));
219 :
220 246 : actions = rule->actions;
221 246 : if (list_length(actions) != 1)
222 0 : elog(ERROR,
223 : "the rule for materialized view \"%s\" is not a single action",
224 : RelationGetRelationName(matviewRel));
225 :
226 : /*
227 : * Check that there is a unique index with no WHERE clause on one or more
228 : * columns of the materialized view if CONCURRENTLY is specified.
229 : */
230 246 : if (concurrent)
231 : {
232 78 : List *indexoidlist = RelationGetIndexList(matviewRel);
233 : ListCell *indexoidscan;
234 78 : bool hasUniqueIndex = false;
235 :
236 90 : foreach(indexoidscan, indexoidlist)
237 : {
238 84 : Oid indexoid = lfirst_oid(indexoidscan);
239 : Relation indexRel;
240 :
241 84 : indexRel = index_open(indexoid, AccessShareLock);
242 84 : hasUniqueIndex = is_usable_unique_index(indexRel);
243 84 : index_close(indexRel, AccessShareLock);
244 84 : if (hasUniqueIndex)
245 72 : break;
246 : }
247 :
248 78 : list_free(indexoidlist);
249 :
250 78 : if (!hasUniqueIndex)
251 6 : ereport(ERROR,
252 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
253 : errmsg("cannot refresh materialized view \"%s\" concurrently",
254 : quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
255 : RelationGetRelationName(matviewRel))),
256 : errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
257 : }
258 :
259 : /*
260 : * The stored query was rewritten at the time of the MV definition, but
261 : * has not been scribbled on by the planner.
262 : */
263 240 : dataQuery = linitial_node(Query, actions);
264 :
265 : /*
266 : * Check for active uses of the relation in the current transaction, such
267 : * as open scans.
268 : *
269 : * NB: We count on this to protect us against problems with refreshing the
270 : * data using TABLE_INSERT_FROZEN.
271 : */
272 240 : CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
273 :
274 : /*
275 : * Tentatively mark the matview as populated or not (this will roll back
276 : * if we fail later).
277 : */
278 240 : SetMatViewPopulatedState(matviewRel, !stmt->skipData);
279 :
280 : /* Concurrent refresh builds new data in temp tablespace, and does diff. */
281 240 : if (concurrent)
282 : {
283 72 : tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
284 72 : relpersistence = RELPERSISTENCE_TEMP;
285 : }
286 : else
287 : {
288 168 : tableSpace = matviewRel->rd_rel->reltablespace;
289 168 : relpersistence = matviewRel->rd_rel->relpersistence;
290 : }
291 :
292 : /*
293 : * Create the transient table that will receive the regenerated data. Lock
294 : * it against access by any other process until commit (by which time it
295 : * will be gone).
296 : */
297 480 : OIDNewHeap = make_new_heap(matviewOid, tableSpace,
298 240 : matviewRel->rd_rel->relam,
299 : relpersistence, ExclusiveLock);
300 240 : LockRelationOid(OIDNewHeap, AccessExclusiveLock);
301 240 : dest = CreateTransientRelDestReceiver(OIDNewHeap);
302 :
303 : /* Generate the data, if wanted. */
304 240 : if (!stmt->skipData)
305 240 : processed = refresh_matview_datafill(dest, dataQuery, queryString);
306 :
307 : /* Make the matview match the newly generated data. */
308 204 : if (concurrent)
309 : {
310 72 : int old_depth = matview_maintenance_depth;
311 :
312 72 : PG_TRY();
313 : {
314 72 : refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
315 : save_sec_context);
316 : }
317 12 : PG_CATCH();
318 : {
319 12 : matview_maintenance_depth = old_depth;
320 12 : PG_RE_THROW();
321 : }
322 60 : PG_END_TRY();
323 : Assert(matview_maintenance_depth == old_depth);
324 : }
325 : else
326 : {
327 132 : refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
328 :
329 : /*
330 : * Inform cumulative stats system about our activity: basically, we
331 : * truncated the matview and inserted some new data. (The concurrent
332 : * code path above doesn't need to worry about this because the
333 : * inserts and deletes it issues get counted by lower-level code.)
334 : */
335 126 : pgstat_count_truncate(matviewRel);
336 126 : if (!stmt->skipData)
337 126 : pgstat_count_heap_insert(matviewRel, processed);
338 : }
339 :
340 186 : table_close(matviewRel, NoLock);
341 :
342 : /* Roll back any GUC changes */
343 186 : AtEOXact_GUC(false, save_nestlevel);
344 :
345 : /* Restore userid and security context */
346 186 : SetUserIdAndSecContext(save_userid, save_sec_context);
347 :
348 186 : ObjectAddressSet(address, RelationRelationId, matviewOid);
349 :
350 : /*
351 : * Save the rowcount so that pg_stat_statements can track the total number
352 : * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
353 : * still don't display the rowcount in the command completion tag output,
354 : * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
355 : * command tag is left false in cmdtaglist.h. Otherwise, the change of
356 : * completion tag output might break applications using it.
357 : */
358 186 : if (qc)
359 186 : SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
360 :
361 186 : return address;
362 : }
363 :
364 : /*
365 : * refresh_matview_datafill
366 : *
367 : * Execute the given query, sending result rows to "dest" (which will
368 : * insert them into the target matview).
369 : *
370 : * Returns number of rows inserted.
371 : */
372 : static uint64
373 240 : refresh_matview_datafill(DestReceiver *dest, Query *query,
374 : const char *queryString)
375 : {
376 : List *rewritten;
377 : PlannedStmt *plan;
378 : QueryDesc *queryDesc;
379 : Query *copied_query;
380 : uint64 processed;
381 :
382 : /* Lock and rewrite, using a copy to preserve the original query. */
383 240 : copied_query = copyObject(query);
384 240 : AcquireRewriteLocks(copied_query, true, false);
385 240 : rewritten = QueryRewrite(copied_query);
386 :
387 : /* SELECT should never rewrite to more or less than one SELECT query */
388 240 : if (list_length(rewritten) != 1)
389 0 : elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
390 240 : query = (Query *) linitial(rewritten);
391 :
392 : /* Check for user-requested abort. */
393 240 : CHECK_FOR_INTERRUPTS();
394 :
395 : /* Plan the query which will generate data for the refresh. */
396 240 : plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
397 :
398 : /*
399 : * Use a snapshot with an updated command ID to ensure this query sees
400 : * results of any previously executed queries. (This could only matter if
401 : * the planner executed an allegedly-stable function that changed the
402 : * database contents, but let's do it anyway to be safe.)
403 : */
404 234 : PushCopiedSnapshot(GetActiveSnapshot());
405 234 : UpdateActiveSnapshotCommandId();
406 :
407 : /* Create a QueryDesc, redirecting output to our tuple receiver */
408 234 : queryDesc = CreateQueryDesc(plan, queryString,
409 : GetActiveSnapshot(), InvalidSnapshot,
410 : dest, NULL, NULL, 0);
411 :
412 : /* call ExecutorStart to prepare the plan for execution */
413 234 : ExecutorStart(queryDesc, 0);
414 :
415 : /* run the plan */
416 234 : ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
417 :
418 204 : processed = queryDesc->estate->es_processed;
419 :
420 : /* and clean up */
421 204 : ExecutorFinish(queryDesc);
422 204 : ExecutorEnd(queryDesc);
423 :
424 204 : FreeQueryDesc(queryDesc);
425 :
426 204 : PopActiveSnapshot();
427 :
428 204 : return processed;
429 : }
430 :
431 : DestReceiver *
432 240 : CreateTransientRelDestReceiver(Oid transientoid)
433 : {
434 240 : DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
435 :
436 240 : self->pub.receiveSlot = transientrel_receive;
437 240 : self->pub.rStartup = transientrel_startup;
438 240 : self->pub.rShutdown = transientrel_shutdown;
439 240 : self->pub.rDestroy = transientrel_destroy;
440 240 : self->pub.mydest = DestTransientRel;
441 240 : self->transientoid = transientoid;
442 :
443 240 : return (DestReceiver *) self;
444 : }
445 :
446 : /*
447 : * transientrel_startup --- executor startup
448 : */
449 : static void
450 234 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
451 : {
452 234 : DR_transientrel *myState = (DR_transientrel *) self;
453 : Relation transientrel;
454 :
455 234 : transientrel = table_open(myState->transientoid, NoLock);
456 :
457 : /*
458 : * Fill private fields of myState for use by later routines
459 : */
460 234 : myState->transientrel = transientrel;
461 234 : myState->output_cid = GetCurrentCommandId(true);
462 234 : myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
463 234 : myState->bistate = GetBulkInsertState();
464 :
465 : /*
466 : * Valid smgr_targblock implies something already wrote to the relation.
467 : * This may be harmless, but this function hasn't planned for it.
468 : */
469 : Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
470 234 : }
471 :
472 : /*
473 : * transientrel_receive --- receive one tuple
474 : */
475 : static bool
476 586 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
477 : {
478 586 : DR_transientrel *myState = (DR_transientrel *) self;
479 :
480 : /*
481 : * Note that the input slot might not be of the type of the target
482 : * relation. That's supported by table_tuple_insert(), but slightly less
483 : * efficient than inserting with the right slot - but the alternative
484 : * would be to copy into a slot of the right type, which would not be
485 : * cheap either. This also doesn't allow accessing per-AM data (say a
486 : * tuple's xmin), but since we don't do that here...
487 : */
488 :
489 586 : table_tuple_insert(myState->transientrel,
490 : slot,
491 : myState->output_cid,
492 : myState->ti_options,
493 : myState->bistate);
494 :
495 : /* We know this is a newly created relation, so there are no indexes */
496 :
497 586 : return true;
498 : }
499 :
500 : /*
501 : * transientrel_shutdown --- executor end
502 : */
503 : static void
504 204 : transientrel_shutdown(DestReceiver *self)
505 : {
506 204 : DR_transientrel *myState = (DR_transientrel *) self;
507 :
508 204 : FreeBulkInsertState(myState->bistate);
509 :
510 204 : table_finish_bulk_insert(myState->transientrel, myState->ti_options);
511 :
512 : /* close transientrel, but keep lock until commit */
513 204 : table_close(myState->transientrel, NoLock);
514 204 : myState->transientrel = NULL;
515 204 : }
516 :
517 : /*
518 : * transientrel_destroy --- release DestReceiver object
519 : */
520 : static void
521 0 : transientrel_destroy(DestReceiver *self)
522 : {
523 0 : pfree(self);
524 0 : }
525 :
526 :
527 : /*
528 : * Given a qualified temporary table name, append an underscore followed by
529 : * the given integer, to make a new table name based on the old one.
530 : * The result is a palloc'd string.
531 : *
532 : * As coded, this would fail to make a valid SQL name if the given name were,
533 : * say, "FOO"."BAR". Currently, the table name portion of the input will
534 : * never be double-quoted because it's of the form "pg_temp_NNN", cf
535 : * make_new_heap(). But we might have to work harder someday.
536 : */
537 : static char *
538 72 : make_temptable_name_n(char *tempname, int n)
539 : {
540 : StringInfoData namebuf;
541 :
542 72 : initStringInfo(&namebuf);
543 72 : appendStringInfoString(&namebuf, tempname);
544 72 : appendStringInfo(&namebuf, "_%d", n);
545 72 : return namebuf.data;
546 : }
547 :
548 : /*
549 : * refresh_by_match_merge
550 : *
551 : * Refresh a materialized view with transactional semantics, while allowing
552 : * concurrent reads.
553 : *
554 : * This is called after a new version of the data has been created in a
555 : * temporary table. It performs a full outer join against the old version of
556 : * the data, producing "diff" results. This join cannot work if there are any
557 : * duplicated rows in either the old or new versions, in the sense that every
558 : * column would compare as equal between the two rows. It does work correctly
559 : * in the face of rows which have at least one NULL value, with all non-NULL
560 : * columns equal. The behavior of NULLs on equality tests and on UNIQUE
561 : * indexes turns out to be quite convenient here; the tests we need to make
562 : * are consistent with default behavior. If there is at least one UNIQUE
563 : * index on the materialized view, we have exactly the guarantee we need.
564 : *
565 : * The temporary table used to hold the diff results contains just the TID of
566 : * the old record (if matched) and the ROW from the new table as a single
567 : * column of complex record type (if matched).
568 : *
569 : * Once we have the diff table, we perform set-based DELETE and INSERT
570 : * operations against the materialized view, and discard both temporary
571 : * tables.
572 : *
573 : * Everything from the generation of the new data to applying the differences
574 : * takes place under cover of an ExclusiveLock, since it seems as though we
575 : * would want to prohibit not only concurrent REFRESH operations, but also
576 : * incremental maintenance. It also doesn't seem reasonable or safe to allow
577 : * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
578 : * this command.
579 : */
580 : static void
581 72 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
582 : int save_sec_context)
583 : {
584 : StringInfoData querybuf;
585 : Relation matviewRel;
586 : Relation tempRel;
587 : char *matviewname;
588 : char *tempname;
589 : char *diffname;
590 : TupleDesc tupdesc;
591 : bool foundUniqueIndex;
592 : List *indexoidlist;
593 : ListCell *indexoidscan;
594 : int16 relnatts;
595 : Oid *opUsedForQual;
596 :
597 72 : initStringInfo(&querybuf);
598 72 : matviewRel = table_open(matviewOid, NoLock);
599 72 : matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
600 72 : RelationGetRelationName(matviewRel));
601 72 : tempRel = table_open(tempOid, NoLock);
602 72 : tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
603 72 : RelationGetRelationName(tempRel));
604 72 : diffname = make_temptable_name_n(tempname, 2);
605 :
606 72 : relnatts = RelationGetNumberOfAttributes(matviewRel);
607 :
608 : /* Open SPI context. */
609 72 : if (SPI_connect() != SPI_OK_CONNECT)
610 0 : elog(ERROR, "SPI_connect failed");
611 :
612 : /* Analyze the temp table with the new contents. */
613 72 : appendStringInfo(&querybuf, "ANALYZE %s", tempname);
614 72 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
615 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
616 :
617 : /*
618 : * We need to ensure that there are not duplicate rows without NULLs in
619 : * the new data set before we can count on the "diff" results. Check for
620 : * that in a way that allows showing the first duplicated row found. Even
621 : * after we pass this test, a unique index on the materialized view may
622 : * find a duplicate key problem.
623 : *
624 : * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
625 : * keep ".*" from being expanded into multiple columns in a SELECT list.
626 : * Compare ruleutils.c's get_variable().
627 : */
628 72 : resetStringInfo(&querybuf);
629 72 : appendStringInfo(&querybuf,
630 : "SELECT newdata.*::%s FROM %s newdata "
631 : "WHERE newdata.* IS NOT NULL AND EXISTS "
632 : "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
633 : "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
634 : "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
635 : "newdata.ctid)",
636 : tempname, tempname, tempname);
637 72 : if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
638 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
639 72 : if (SPI_processed > 0)
640 : {
641 : /*
642 : * Note that this ereport() is returning data to the user. Generally,
643 : * we would want to make sure that the user has been granted access to
644 : * this data. However, REFRESH MAT VIEW is only able to be run by the
645 : * owner of the mat view (or a superuser) and therefore there is no
646 : * need to check for access to data in the mat view.
647 : */
648 6 : ereport(ERROR,
649 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
650 : errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
651 : RelationGetRelationName(matviewRel)),
652 : errdetail("Row: %s",
653 : SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
654 : }
655 :
656 : /*
657 : * Create the temporary "diff" table.
658 : *
659 : * Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
660 : * because you cannot create temp tables in SRO context. For extra
661 : * paranoia, add the composite type column only after switching back to
662 : * SRO context.
663 : */
664 66 : SetUserIdAndSecContext(relowner,
665 : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
666 66 : resetStringInfo(&querybuf);
667 66 : appendStringInfo(&querybuf,
668 : "CREATE TEMP TABLE %s (tid pg_catalog.tid)",
669 : diffname);
670 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
671 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
672 66 : SetUserIdAndSecContext(relowner,
673 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
674 66 : resetStringInfo(&querybuf);
675 66 : appendStringInfo(&querybuf,
676 : "ALTER TABLE %s ADD COLUMN newdata %s",
677 : diffname, tempname);
678 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
679 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
680 :
681 : /* Start building the query for populating the diff table. */
682 66 : resetStringInfo(&querybuf);
683 66 : appendStringInfo(&querybuf,
684 : "INSERT INTO %s "
685 : "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
686 : "FROM %s mv FULL JOIN %s newdata ON (",
687 : diffname, tempname, matviewname, tempname);
688 :
689 : /*
690 : * Get the list of index OIDs for the table from the relcache, and look up
691 : * each one in the pg_index syscache. We will test for equality on all
692 : * columns present in all unique indexes which only reference columns and
693 : * include all rows.
694 : */
695 66 : tupdesc = matviewRel->rd_att;
696 66 : opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
697 66 : foundUniqueIndex = false;
698 :
699 66 : indexoidlist = RelationGetIndexList(matviewRel);
700 :
701 138 : foreach(indexoidscan, indexoidlist)
702 : {
703 72 : Oid indexoid = lfirst_oid(indexoidscan);
704 : Relation indexRel;
705 :
706 72 : indexRel = index_open(indexoid, RowExclusiveLock);
707 72 : if (is_usable_unique_index(indexRel))
708 : {
709 72 : Form_pg_index indexStruct = indexRel->rd_index;
710 72 : int indnkeyatts = indexStruct->indnkeyatts;
711 : oidvector *indclass;
712 : Datum indclassDatum;
713 : int i;
714 :
715 : /* Must get indclass the hard way. */
716 72 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
717 72 : indexRel->rd_indextuple,
718 : Anum_pg_index_indclass);
719 72 : indclass = (oidvector *) DatumGetPointer(indclassDatum);
720 :
721 : /* Add quals for all columns from this index. */
722 160 : for (i = 0; i < indnkeyatts; i++)
723 : {
724 88 : int attnum = indexStruct->indkey.values[i];
725 88 : Oid opclass = indclass->values[i];
726 88 : Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
727 88 : Oid attrtype = attr->atttypid;
728 : HeapTuple cla_ht;
729 : Form_pg_opclass cla_tup;
730 : Oid opfamily;
731 : Oid opcintype;
732 : Oid op;
733 : const char *leftop;
734 : const char *rightop;
735 :
736 : /*
737 : * Identify the equality operator associated with this index
738 : * column. First we need to look up the column's opclass.
739 : */
740 88 : cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
741 88 : if (!HeapTupleIsValid(cla_ht))
742 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
743 88 : cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
744 : Assert(cla_tup->opcmethod == BTREE_AM_OID);
745 88 : opfamily = cla_tup->opcfamily;
746 88 : opcintype = cla_tup->opcintype;
747 88 : ReleaseSysCache(cla_ht);
748 :
749 88 : op = get_opfamily_member(opfamily, opcintype, opcintype,
750 : BTEqualStrategyNumber);
751 88 : if (!OidIsValid(op))
752 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
753 : BTEqualStrategyNumber, opcintype, opcintype, opfamily);
754 :
755 : /*
756 : * If we find the same column with the same equality semantics
757 : * in more than one index, we only need to emit the equality
758 : * clause once.
759 : *
760 : * Since we only remember the last equality operator, this
761 : * code could be fooled into emitting duplicate clauses given
762 : * multiple indexes with several different opclasses ... but
763 : * that's so unlikely it doesn't seem worth spending extra
764 : * code to avoid.
765 : */
766 88 : if (opUsedForQual[attnum - 1] == op)
767 0 : continue;
768 88 : opUsedForQual[attnum - 1] = op;
769 :
770 : /*
771 : * Actually add the qual, ANDed with any others.
772 : */
773 88 : if (foundUniqueIndex)
774 28 : appendStringInfoString(&querybuf, " AND ");
775 :
776 88 : leftop = quote_qualified_identifier("newdata",
777 88 : NameStr(attr->attname));
778 88 : rightop = quote_qualified_identifier("mv",
779 88 : NameStr(attr->attname));
780 :
781 88 : generate_operator_clause(&querybuf,
782 : leftop, attrtype,
783 : op,
784 : rightop, attrtype);
785 :
786 88 : foundUniqueIndex = true;
787 : }
788 : }
789 :
790 : /* Keep the locks, since we're about to run DML which needs them. */
791 72 : index_close(indexRel, NoLock);
792 : }
793 :
794 66 : list_free(indexoidlist);
795 :
796 : /*
797 : * There must be at least one usable unique index on the matview.
798 : *
799 : * ExecRefreshMatView() checks that after taking the exclusive lock on the
800 : * matview. So at least one unique index is guaranteed to exist here
801 : * because the lock is still being held. (One known exception is if a
802 : * function called as part of refreshing the matview drops the index.
803 : * That's a pretty silly thing to do.)
804 : */
805 66 : if (!foundUniqueIndex)
806 6 : elog(ERROR, "could not find suitable unique index on materialized view");
807 :
808 60 : appendStringInfoString(&querybuf,
809 : " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
810 : "WHERE newdata.* IS NULL OR mv.* IS NULL "
811 : "ORDER BY tid");
812 :
813 : /* Populate the temporary "diff" table. */
814 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
815 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
816 :
817 : /*
818 : * We have no further use for data from the "full-data" temp table, but we
819 : * must keep it around because its type is referenced from the diff table.
820 : */
821 :
822 : /* Analyze the diff table. */
823 60 : resetStringInfo(&querybuf);
824 60 : appendStringInfo(&querybuf, "ANALYZE %s", diffname);
825 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
826 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
827 :
828 60 : OpenMatViewIncrementalMaintenance();
829 :
830 : /* Deletes must come before inserts; do them first. */
831 60 : resetStringInfo(&querybuf);
832 60 : appendStringInfo(&querybuf,
833 : "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
834 : "(SELECT diff.tid FROM %s diff "
835 : "WHERE diff.tid IS NOT NULL "
836 : "AND diff.newdata IS NULL)",
837 : matviewname, diffname);
838 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
839 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
840 :
841 : /* Inserts go last. */
842 60 : resetStringInfo(&querybuf);
843 60 : appendStringInfo(&querybuf,
844 : "INSERT INTO %s SELECT (diff.newdata).* "
845 : "FROM %s diff WHERE tid IS NULL",
846 : matviewname, diffname);
847 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
848 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
849 :
850 : /* We're done maintaining the materialized view. */
851 60 : CloseMatViewIncrementalMaintenance();
852 60 : table_close(tempRel, NoLock);
853 60 : table_close(matviewRel, NoLock);
854 :
855 : /* Clean up temp tables. */
856 60 : resetStringInfo(&querybuf);
857 60 : appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
858 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
859 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
860 :
861 : /* Close SPI context. */
862 60 : if (SPI_finish() != SPI_OK_FINISH)
863 0 : elog(ERROR, "SPI_finish failed");
864 60 : }
865 :
866 : /*
867 : * Swap the physical files of the target and transient tables, then rebuild
868 : * the target's indexes and throw away the transient table. Security context
869 : * swapping is handled by the called function, so it is not needed here.
870 : */
871 : static void
872 132 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
873 : {
874 132 : finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
875 : RecentXmin, ReadNextMultiXactId(), relpersistence);
876 126 : }
877 :
878 : /*
879 : * Check whether specified index is usable for match merge.
880 : */
881 : static bool
882 156 : is_usable_unique_index(Relation indexRel)
883 : {
884 156 : Form_pg_index indexStruct = indexRel->rd_index;
885 :
886 : /*
887 : * Must be unique, valid, immediate, non-partial, and be defined over
888 : * plain user columns (not expressions). We also require it to be a
889 : * btree. Even if we had any other unique index kinds, we'd not know how
890 : * to identify the corresponding equality operator, nor could we be sure
891 : * that the planner could implement the required FULL JOIN with non-btree
892 : * operators.
893 : */
894 156 : if (indexStruct->indisunique &&
895 156 : indexStruct->indimmediate &&
896 156 : indexRel->rd_rel->relam == BTREE_AM_OID &&
897 312 : indexStruct->indisvalid &&
898 156 : RelationGetIndexPredicate(indexRel) == NIL &&
899 150 : indexStruct->indnatts > 0)
900 : {
901 : /*
902 : * The point of groveling through the index columns individually is to
903 : * reject both index expressions and system columns. Currently,
904 : * matviews couldn't have OID columns so there's no way to create an
905 : * index on a system column; but maybe someday that wouldn't be true,
906 : * so let's be safe.
907 : */
908 150 : int numatts = indexStruct->indnatts;
909 : int i;
910 :
911 326 : for (i = 0; i < numatts; i++)
912 : {
913 182 : int attnum = indexStruct->indkey.values[i];
914 :
915 182 : if (attnum <= 0)
916 6 : return false;
917 : }
918 144 : return true;
919 : }
920 6 : return false;
921 : }
922 :
923 :
924 : /*
925 : * This should be used to test whether the backend is in a context where it is
926 : * OK to allow DML statements to modify materialized views. We only want to
927 : * allow that for internal code driven by the materialized view definition,
928 : * not for arbitrary user-supplied code.
929 : *
930 : * While the function names reflect the fact that their main intended use is
931 : * incremental maintenance of materialized views (in response to changes to
932 : * the data in referenced relations), they are initially used to allow REFRESH
933 : * without blocking concurrent reads.
934 : */
935 : bool
936 120 : MatViewIncrementalMaintenanceIsEnabled(void)
937 : {
938 120 : return matview_maintenance_depth > 0;
939 : }
940 :
941 : static void
942 60 : OpenMatViewIncrementalMaintenance(void)
943 : {
944 60 : matview_maintenance_depth++;
945 60 : }
946 :
947 : static void
948 60 : CloseMatViewIncrementalMaintenance(void)
949 : {
950 60 : matview_maintenance_depth--;
951 : Assert(matview_maintenance_depth >= 0);
952 60 : }
|