Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * matview.c
4 : * materialized view support
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/matview.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/heapam.h"
19 : #include "access/htup_details.h"
20 : #include "access/multixact.h"
21 : #include "access/tableam.h"
22 : #include "access/xact.h"
23 : #include "access/xlog.h"
24 : #include "catalog/catalog.h"
25 : #include "catalog/indexing.h"
26 : #include "catalog/namespace.h"
27 : #include "catalog/pg_am.h"
28 : #include "catalog/pg_opclass.h"
29 : #include "catalog/pg_operator.h"
30 : #include "commands/cluster.h"
31 : #include "commands/matview.h"
32 : #include "commands/tablecmds.h"
33 : #include "commands/tablespace.h"
34 : #include "executor/executor.h"
35 : #include "executor/spi.h"
36 : #include "miscadmin.h"
37 : #include "parser/parse_relation.h"
38 : #include "pgstat.h"
39 : #include "rewrite/rewriteHandler.h"
40 : #include "storage/lmgr.h"
41 : #include "storage/smgr.h"
42 : #include "tcop/tcopprot.h"
43 : #include "utils/builtins.h"
44 : #include "utils/lsyscache.h"
45 : #include "utils/rel.h"
46 : #include "utils/snapmgr.h"
47 : #include "utils/syscache.h"
48 :
49 :
50 : typedef struct
51 : {
52 : DestReceiver pub; /* publicly-known function pointers */
53 : Oid transientoid; /* OID of new heap into which to store */
54 : /* These fields are filled by transientrel_startup: */
55 : Relation transientrel; /* relation to write to */
56 : CommandId output_cid; /* cmin to insert in output tuples */
57 : int ti_options; /* table_tuple_insert performance options */
58 : BulkInsertState bistate; /* bulk insert state */
59 : } DR_transientrel;
60 :
61 : static int matview_maintenance_depth = 0;
62 :
63 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65 : static void transientrel_shutdown(DestReceiver *self);
66 : static void transientrel_destroy(DestReceiver *self);
67 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68 : const char *queryString);
69 : static char *make_temptable_name_n(char *tempname, int n);
70 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71 : int save_sec_context);
72 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73 : static bool is_usable_unique_index(Relation indexRel);
74 : static void OpenMatViewIncrementalMaintenance(void);
75 : static void CloseMatViewIncrementalMaintenance(void);
76 :
77 : /*
78 : * SetMatViewPopulatedState
79 : * Mark a materialized view as populated, or not.
80 : *
81 : * NOTE: caller must be holding an appropriate lock on the relation.
82 : */
83 : void
84 570 : SetMatViewPopulatedState(Relation relation, bool newstate)
85 : {
86 : Relation pgrel;
87 : HeapTuple tuple;
88 :
89 : Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90 :
91 : /*
92 : * Update relation's pg_class entry. Crucial side-effect: other backends
93 : * (and this one too!) are sent SI message to make them rebuild relcache
94 : * entries.
95 : */
96 570 : pgrel = table_open(RelationRelationId, RowExclusiveLock);
97 570 : tuple = SearchSysCacheCopy1(RELOID,
98 : ObjectIdGetDatum(RelationGetRelid(relation)));
99 570 : if (!HeapTupleIsValid(tuple))
100 0 : elog(ERROR, "cache lookup failed for relation %u",
101 : RelationGetRelid(relation));
102 :
103 570 : ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104 :
105 570 : CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106 :
107 570 : heap_freetuple(tuple);
108 570 : table_close(pgrel, RowExclusiveLock);
109 :
110 : /*
111 : * Advance command counter to make the updated pg_class row locally
112 : * visible.
113 : */
114 570 : CommandCounterIncrement();
115 570 : }
116 :
117 : /*
118 : * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
119 : *
120 : * This refreshes the materialized view by creating a new table and swapping
121 : * the relfilenumbers of the new table and the old materialized view, so the OID
122 : * of the original materialized view is preserved. Thus we do not lose GRANT
123 : * nor references to this materialized view.
124 : *
125 : * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126 : * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127 : * statement associated with the materialized view. The statement node's
128 : * skipData field shows whether the clause was used.
129 : *
130 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131 : * the new heap, it's better to create the indexes afterwards than to fill them
132 : * incrementally while we load.
133 : *
134 : * The matview's "populated" state is changed based on whether the contents
135 : * reflect the result set of the materialized view's query.
136 : */
137 : ObjectAddress
138 246 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139 : ParamListInfo params, QueryCompletion *qc)
140 : {
141 : Oid matviewOid;
142 : Relation matviewRel;
143 : RewriteRule *rule;
144 : List *actions;
145 : Query *dataQuery;
146 : Oid tableSpace;
147 : Oid relowner;
148 : Oid OIDNewHeap;
149 : DestReceiver *dest;
150 246 : uint64 processed = 0;
151 : bool concurrent;
152 : LOCKMODE lockmode;
153 : char relpersistence;
154 : Oid save_userid;
155 : int save_sec_context;
156 : int save_nestlevel;
157 : ObjectAddress address;
158 :
159 : /* Determine strength of lock needed. */
160 246 : concurrent = stmt->concurrent;
161 246 : lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162 :
163 : /*
164 : * Get a lock until end of transaction.
165 : */
166 246 : matviewOid = RangeVarGetRelidExtended(stmt->relation,
167 : lockmode, 0,
168 : RangeVarCallbackMaintainsTable,
169 : NULL);
170 240 : matviewRel = table_open(matviewOid, NoLock);
171 240 : relowner = matviewRel->rd_rel->relowner;
172 :
173 : /*
174 : * Switch to the owner's userid, so that any functions are run as that
175 : * user. Also lock down security-restricted operations and arrange to
176 : * make GUC variable changes local to this command.
177 : */
178 240 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
179 240 : SetUserIdAndSecContext(relowner,
180 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
181 240 : save_nestlevel = NewGUCNestLevel();
182 :
183 : /* Make sure it is a materialized view. */
184 240 : if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
185 0 : ereport(ERROR,
186 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
187 : errmsg("\"%s\" is not a materialized view",
188 : RelationGetRelationName(matviewRel))));
189 :
190 : /* Check that CONCURRENTLY is not specified if not populated. */
191 240 : if (concurrent && !RelationIsPopulated(matviewRel))
192 0 : ereport(ERROR,
193 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
194 : errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
195 :
196 : /* Check that conflicting options have not been specified. */
197 240 : if (concurrent && stmt->skipData)
198 6 : ereport(ERROR,
199 : (errcode(ERRCODE_SYNTAX_ERROR),
200 : errmsg("%s and %s options cannot be used together",
201 : "CONCURRENTLY", "WITH NO DATA")));
202 :
203 : /*
204 : * Check that everything is correct for a refresh. Problems at this point
205 : * are internal errors, so elog is sufficient.
206 : */
207 234 : if (matviewRel->rd_rel->relhasrules == false ||
208 234 : matviewRel->rd_rules->numLocks < 1)
209 0 : elog(ERROR,
210 : "materialized view \"%s\" is missing rewrite information",
211 : RelationGetRelationName(matviewRel));
212 :
213 234 : if (matviewRel->rd_rules->numLocks > 1)
214 0 : elog(ERROR,
215 : "materialized view \"%s\" has too many rules",
216 : RelationGetRelationName(matviewRel));
217 :
218 234 : rule = matviewRel->rd_rules->rules[0];
219 234 : if (rule->event != CMD_SELECT || !(rule->isInstead))
220 0 : elog(ERROR,
221 : "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
222 : RelationGetRelationName(matviewRel));
223 :
224 234 : actions = rule->actions;
225 234 : if (list_length(actions) != 1)
226 0 : elog(ERROR,
227 : "the rule for materialized view \"%s\" is not a single action",
228 : RelationGetRelationName(matviewRel));
229 :
230 : /*
231 : * Check that there is a unique index with no WHERE clause on one or more
232 : * columns of the materialized view if CONCURRENTLY is specified.
233 : */
234 234 : if (concurrent)
235 : {
236 72 : List *indexoidlist = RelationGetIndexList(matviewRel);
237 : ListCell *indexoidscan;
238 72 : bool hasUniqueIndex = false;
239 :
240 84 : foreach(indexoidscan, indexoidlist)
241 : {
242 78 : Oid indexoid = lfirst_oid(indexoidscan);
243 : Relation indexRel;
244 :
245 78 : indexRel = index_open(indexoid, AccessShareLock);
246 78 : hasUniqueIndex = is_usable_unique_index(indexRel);
247 78 : index_close(indexRel, AccessShareLock);
248 78 : if (hasUniqueIndex)
249 66 : break;
250 : }
251 :
252 72 : list_free(indexoidlist);
253 :
254 72 : if (!hasUniqueIndex)
255 6 : ereport(ERROR,
256 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
257 : errmsg("cannot refresh materialized view \"%s\" concurrently",
258 : quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
259 : RelationGetRelationName(matviewRel))),
260 : errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
261 : }
262 :
263 : /*
264 : * The stored query was rewritten at the time of the MV definition, but
265 : * has not been scribbled on by the planner.
266 : */
267 228 : dataQuery = linitial_node(Query, actions);
268 :
269 : /*
270 : * Check for active uses of the relation in the current transaction, such
271 : * as open scans.
272 : *
273 : * NB: We count on this to protect us against problems with refreshing the
274 : * data using TABLE_INSERT_FROZEN.
275 : */
276 228 : CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
277 :
278 : /*
279 : * Tentatively mark the matview as populated or not (this will roll back
280 : * if we fail later).
281 : */
282 228 : SetMatViewPopulatedState(matviewRel, !stmt->skipData);
283 :
284 : /* Concurrent refresh builds new data in temp tablespace, and does diff. */
285 228 : if (concurrent)
286 : {
287 66 : tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
288 66 : relpersistence = RELPERSISTENCE_TEMP;
289 : }
290 : else
291 : {
292 162 : tableSpace = matviewRel->rd_rel->reltablespace;
293 162 : relpersistence = matviewRel->rd_rel->relpersistence;
294 : }
295 :
296 : /*
297 : * Create the transient table that will receive the regenerated data. Lock
298 : * it against access by any other process until commit (by which time it
299 : * will be gone).
300 : */
301 456 : OIDNewHeap = make_new_heap(matviewOid, tableSpace,
302 228 : matviewRel->rd_rel->relam,
303 : relpersistence, ExclusiveLock);
304 228 : LockRelationOid(OIDNewHeap, AccessExclusiveLock);
305 228 : dest = CreateTransientRelDestReceiver(OIDNewHeap);
306 :
307 : /* Generate the data, if wanted. */
308 228 : if (!stmt->skipData)
309 228 : processed = refresh_matview_datafill(dest, dataQuery, queryString);
310 :
311 : /* Make the matview match the newly generated data. */
312 192 : if (concurrent)
313 : {
314 66 : int old_depth = matview_maintenance_depth;
315 :
316 66 : PG_TRY();
317 : {
318 66 : refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
319 : save_sec_context);
320 : }
321 6 : PG_CATCH();
322 : {
323 6 : matview_maintenance_depth = old_depth;
324 6 : PG_RE_THROW();
325 : }
326 60 : PG_END_TRY();
327 : Assert(matview_maintenance_depth == old_depth);
328 : }
329 : else
330 : {
331 126 : refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
332 :
333 : /*
334 : * Inform cumulative stats system about our activity: basically, we
335 : * truncated the matview and inserted some new data. (The concurrent
336 : * code path above doesn't need to worry about this because the
337 : * inserts and deletes it issues get counted by lower-level code.)
338 : */
339 120 : pgstat_count_truncate(matviewRel);
340 120 : if (!stmt->skipData)
341 120 : pgstat_count_heap_insert(matviewRel, processed);
342 : }
343 :
344 180 : table_close(matviewRel, NoLock);
345 :
346 : /* Roll back any GUC changes */
347 180 : AtEOXact_GUC(false, save_nestlevel);
348 :
349 : /* Restore userid and security context */
350 180 : SetUserIdAndSecContext(save_userid, save_sec_context);
351 :
352 180 : ObjectAddressSet(address, RelationRelationId, matviewOid);
353 :
354 : /*
355 : * Save the rowcount so that pg_stat_statements can track the total number
356 : * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
357 : * still don't display the rowcount in the command completion tag output,
358 : * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
359 : * command tag is left false in cmdtaglist.h. Otherwise, the change of
360 : * completion tag output might break applications using it.
361 : */
362 180 : if (qc)
363 180 : SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
364 :
365 180 : return address;
366 : }
367 :
368 : /*
369 : * refresh_matview_datafill
370 : *
371 : * Execute the given query, sending result rows to "dest" (which will
372 : * insert them into the target matview).
373 : *
374 : * Returns number of rows inserted.
375 : */
376 : static uint64
377 228 : refresh_matview_datafill(DestReceiver *dest, Query *query,
378 : const char *queryString)
379 : {
380 : List *rewritten;
381 : PlannedStmt *plan;
382 : QueryDesc *queryDesc;
383 : Query *copied_query;
384 : uint64 processed;
385 :
386 : /* Lock and rewrite, using a copy to preserve the original query. */
387 228 : copied_query = copyObject(query);
388 228 : AcquireRewriteLocks(copied_query, true, false);
389 228 : rewritten = QueryRewrite(copied_query);
390 :
391 : /* SELECT should never rewrite to more or less than one SELECT query */
392 228 : if (list_length(rewritten) != 1)
393 0 : elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
394 228 : query = (Query *) linitial(rewritten);
395 :
396 : /* Check for user-requested abort. */
397 228 : CHECK_FOR_INTERRUPTS();
398 :
399 : /* Plan the query which will generate data for the refresh. */
400 228 : plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
401 :
402 : /*
403 : * Use a snapshot with an updated command ID to ensure this query sees
404 : * results of any previously executed queries. (This could only matter if
405 : * the planner executed an allegedly-stable function that changed the
406 : * database contents, but let's do it anyway to be safe.)
407 : */
408 222 : PushCopiedSnapshot(GetActiveSnapshot());
409 222 : UpdateActiveSnapshotCommandId();
410 :
411 : /* Create a QueryDesc, redirecting output to our tuple receiver */
412 222 : queryDesc = CreateQueryDesc(plan, queryString,
413 : GetActiveSnapshot(), InvalidSnapshot,
414 : dest, NULL, NULL, 0);
415 :
416 : /* call ExecutorStart to prepare the plan for execution */
417 222 : ExecutorStart(queryDesc, 0);
418 :
419 : /* run the plan */
420 222 : ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
421 :
422 192 : processed = queryDesc->estate->es_processed;
423 :
424 : /* and clean up */
425 192 : ExecutorFinish(queryDesc);
426 192 : ExecutorEnd(queryDesc);
427 :
428 192 : FreeQueryDesc(queryDesc);
429 :
430 192 : PopActiveSnapshot();
431 :
432 192 : return processed;
433 : }
434 :
435 : DestReceiver *
436 228 : CreateTransientRelDestReceiver(Oid transientoid)
437 : {
438 228 : DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
439 :
440 228 : self->pub.receiveSlot = transientrel_receive;
441 228 : self->pub.rStartup = transientrel_startup;
442 228 : self->pub.rShutdown = transientrel_shutdown;
443 228 : self->pub.rDestroy = transientrel_destroy;
444 228 : self->pub.mydest = DestTransientRel;
445 228 : self->transientoid = transientoid;
446 :
447 228 : return (DestReceiver *) self;
448 : }
449 :
450 : /*
451 : * transientrel_startup --- executor startup
452 : */
453 : static void
454 222 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
455 : {
456 222 : DR_transientrel *myState = (DR_transientrel *) self;
457 : Relation transientrel;
458 :
459 222 : transientrel = table_open(myState->transientoid, NoLock);
460 :
461 : /*
462 : * Fill private fields of myState for use by later routines
463 : */
464 222 : myState->transientrel = transientrel;
465 222 : myState->output_cid = GetCurrentCommandId(true);
466 222 : myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
467 222 : myState->bistate = GetBulkInsertState();
468 :
469 : /*
470 : * Valid smgr_targblock implies something already wrote to the relation.
471 : * This may be harmless, but this function hasn't planned for it.
472 : */
473 : Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
474 222 : }
475 :
476 : /*
477 : * transientrel_receive --- receive one tuple
478 : */
479 : static bool
480 568 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
481 : {
482 568 : DR_transientrel *myState = (DR_transientrel *) self;
483 :
484 : /*
485 : * Note that the input slot might not be of the type of the target
486 : * relation. That's supported by table_tuple_insert(), but slightly less
487 : * efficient than inserting with the right slot - but the alternative
488 : * would be to copy into a slot of the right type, which would not be
489 : * cheap either. This also doesn't allow accessing per-AM data (say a
490 : * tuple's xmin), but since we don't do that here...
491 : */
492 :
493 568 : table_tuple_insert(myState->transientrel,
494 : slot,
495 : myState->output_cid,
496 : myState->ti_options,
497 : myState->bistate);
498 :
499 : /* We know this is a newly created relation, so there are no indexes */
500 :
501 568 : return true;
502 : }
503 :
504 : /*
505 : * transientrel_shutdown --- executor end
506 : */
507 : static void
508 192 : transientrel_shutdown(DestReceiver *self)
509 : {
510 192 : DR_transientrel *myState = (DR_transientrel *) self;
511 :
512 192 : FreeBulkInsertState(myState->bistate);
513 :
514 192 : table_finish_bulk_insert(myState->transientrel, myState->ti_options);
515 :
516 : /* close transientrel, but keep lock until commit */
517 192 : table_close(myState->transientrel, NoLock);
518 192 : myState->transientrel = NULL;
519 192 : }
520 :
521 : /*
522 : * transientrel_destroy --- release DestReceiver object
523 : */
524 : static void
525 0 : transientrel_destroy(DestReceiver *self)
526 : {
527 0 : pfree(self);
528 0 : }
529 :
530 :
531 : /*
532 : * Given a qualified temporary table name, append an underscore followed by
533 : * the given integer, to make a new table name based on the old one.
534 : * The result is a palloc'd string.
535 : *
536 : * As coded, this would fail to make a valid SQL name if the given name were,
537 : * say, "FOO"."BAR". Currently, the table name portion of the input will
538 : * never be double-quoted because it's of the form "pg_temp_NNN", cf
539 : * make_new_heap(). But we might have to work harder someday.
540 : */
541 : static char *
542 66 : make_temptable_name_n(char *tempname, int n)
543 : {
544 : StringInfoData namebuf;
545 :
546 66 : initStringInfo(&namebuf);
547 66 : appendStringInfoString(&namebuf, tempname);
548 66 : appendStringInfo(&namebuf, "_%d", n);
549 66 : return namebuf.data;
550 : }
551 :
552 : /*
553 : * refresh_by_match_merge
554 : *
555 : * Refresh a materialized view with transactional semantics, while allowing
556 : * concurrent reads.
557 : *
558 : * This is called after a new version of the data has been created in a
559 : * temporary table. It performs a full outer join against the old version of
560 : * the data, producing "diff" results. This join cannot work if there are any
561 : * duplicated rows in either the old or new versions, in the sense that every
562 : * column would compare as equal between the two rows. It does work correctly
563 : * in the face of rows which have at least one NULL value, with all non-NULL
564 : * columns equal. The behavior of NULLs on equality tests and on UNIQUE
565 : * indexes turns out to be quite convenient here; the tests we need to make
566 : * are consistent with default behavior. If there is at least one UNIQUE
567 : * index on the materialized view, we have exactly the guarantee we need.
568 : *
569 : * The temporary table used to hold the diff results contains just the TID of
570 : * the old record (if matched) and the ROW from the new table as a single
571 : * column of complex record type (if matched).
572 : *
573 : * Once we have the diff table, we perform set-based DELETE and INSERT
574 : * operations against the materialized view, and discard both temporary
575 : * tables.
576 : *
577 : * Everything from the generation of the new data to applying the differences
578 : * takes place under cover of an ExclusiveLock, since it seems as though we
579 : * would want to prohibit not only concurrent REFRESH operations, but also
580 : * incremental maintenance. It also doesn't seem reasonable or safe to allow
581 : * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
582 : * this command.
583 : */
584 : static void
585 66 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
586 : int save_sec_context)
587 : {
588 : StringInfoData querybuf;
589 : Relation matviewRel;
590 : Relation tempRel;
591 : char *matviewname;
592 : char *tempname;
593 : char *diffname;
594 : TupleDesc tupdesc;
595 : bool foundUniqueIndex;
596 : List *indexoidlist;
597 : ListCell *indexoidscan;
598 : int16 relnatts;
599 : Oid *opUsedForQual;
600 :
601 66 : initStringInfo(&querybuf);
602 66 : matviewRel = table_open(matviewOid, NoLock);
603 66 : matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
604 66 : RelationGetRelationName(matviewRel));
605 66 : tempRel = table_open(tempOid, NoLock);
606 66 : tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
607 66 : RelationGetRelationName(tempRel));
608 66 : diffname = make_temptable_name_n(tempname, 2);
609 :
610 66 : relnatts = RelationGetNumberOfAttributes(matviewRel);
611 :
612 : /* Open SPI context. */
613 66 : if (SPI_connect() != SPI_OK_CONNECT)
614 0 : elog(ERROR, "SPI_connect failed");
615 :
616 : /* Analyze the temp table with the new contents. */
617 66 : appendStringInfo(&querybuf, "ANALYZE %s", tempname);
618 66 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
619 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
620 :
621 : /*
622 : * We need to ensure that there are not duplicate rows without NULLs in
623 : * the new data set before we can count on the "diff" results. Check for
624 : * that in a way that allows showing the first duplicated row found. Even
625 : * after we pass this test, a unique index on the materialized view may
626 : * find a duplicate key problem.
627 : *
628 : * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
629 : * keep ".*" from being expanded into multiple columns in a SELECT list.
630 : * Compare ruleutils.c's get_variable().
631 : */
632 66 : resetStringInfo(&querybuf);
633 66 : appendStringInfo(&querybuf,
634 : "SELECT newdata.*::%s FROM %s newdata "
635 : "WHERE newdata.* IS NOT NULL AND EXISTS "
636 : "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
637 : "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
638 : "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
639 : "newdata.ctid)",
640 : tempname, tempname, tempname);
641 66 : if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
642 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
643 66 : if (SPI_processed > 0)
644 : {
645 : /*
646 : * Note that this ereport() is returning data to the user. Generally,
647 : * we would want to make sure that the user has been granted access to
648 : * this data. However, REFRESH MAT VIEW is only able to be run by the
649 : * owner of the mat view (or a superuser) and therefore there is no
650 : * need to check for access to data in the mat view.
651 : */
652 6 : ereport(ERROR,
653 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
654 : errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
655 : RelationGetRelationName(matviewRel)),
656 : errdetail("Row: %s",
657 : SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
658 : }
659 :
660 60 : SetUserIdAndSecContext(relowner,
661 : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
662 :
663 : /* Start building the query for creating the diff table. */
664 60 : resetStringInfo(&querybuf);
665 60 : appendStringInfo(&querybuf,
666 : "CREATE TEMP TABLE %s AS "
667 : "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
668 : "FROM %s mv FULL JOIN %s newdata ON (",
669 : diffname, tempname, matviewname, tempname);
670 :
671 : /*
672 : * Get the list of index OIDs for the table from the relcache, and look up
673 : * each one in the pg_index syscache. We will test for equality on all
674 : * columns present in all unique indexes which only reference columns and
675 : * include all rows.
676 : */
677 60 : tupdesc = matviewRel->rd_att;
678 60 : opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
679 60 : foundUniqueIndex = false;
680 :
681 60 : indexoidlist = RelationGetIndexList(matviewRel);
682 :
683 132 : foreach(indexoidscan, indexoidlist)
684 : {
685 72 : Oid indexoid = lfirst_oid(indexoidscan);
686 : Relation indexRel;
687 :
688 72 : indexRel = index_open(indexoid, RowExclusiveLock);
689 72 : if (is_usable_unique_index(indexRel))
690 : {
691 72 : Form_pg_index indexStruct = indexRel->rd_index;
692 72 : int indnkeyatts = indexStruct->indnkeyatts;
693 : oidvector *indclass;
694 : Datum indclassDatum;
695 : int i;
696 :
697 : /* Must get indclass the hard way. */
698 72 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
699 72 : indexRel->rd_indextuple,
700 : Anum_pg_index_indclass);
701 72 : indclass = (oidvector *) DatumGetPointer(indclassDatum);
702 :
703 : /* Add quals for all columns from this index. */
704 160 : for (i = 0; i < indnkeyatts; i++)
705 : {
706 88 : int attnum = indexStruct->indkey.values[i];
707 88 : Oid opclass = indclass->values[i];
708 88 : Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
709 88 : Oid attrtype = attr->atttypid;
710 : HeapTuple cla_ht;
711 : Form_pg_opclass cla_tup;
712 : Oid opfamily;
713 : Oid opcintype;
714 : Oid op;
715 : const char *leftop;
716 : const char *rightop;
717 :
718 : /*
719 : * Identify the equality operator associated with this index
720 : * column. First we need to look up the column's opclass.
721 : */
722 88 : cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
723 88 : if (!HeapTupleIsValid(cla_ht))
724 0 : elog(ERROR, "cache lookup failed for opclass %u", opclass);
725 88 : cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
726 : Assert(cla_tup->opcmethod == BTREE_AM_OID);
727 88 : opfamily = cla_tup->opcfamily;
728 88 : opcintype = cla_tup->opcintype;
729 88 : ReleaseSysCache(cla_ht);
730 :
731 88 : op = get_opfamily_member(opfamily, opcintype, opcintype,
732 : BTEqualStrategyNumber);
733 88 : if (!OidIsValid(op))
734 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
735 : BTEqualStrategyNumber, opcintype, opcintype, opfamily);
736 :
737 : /*
738 : * If we find the same column with the same equality semantics
739 : * in more than one index, we only need to emit the equality
740 : * clause once.
741 : *
742 : * Since we only remember the last equality operator, this
743 : * code could be fooled into emitting duplicate clauses given
744 : * multiple indexes with several different opclasses ... but
745 : * that's so unlikely it doesn't seem worth spending extra
746 : * code to avoid.
747 : */
748 88 : if (opUsedForQual[attnum - 1] == op)
749 0 : continue;
750 88 : opUsedForQual[attnum - 1] = op;
751 :
752 : /*
753 : * Actually add the qual, ANDed with any others.
754 : */
755 88 : if (foundUniqueIndex)
756 28 : appendStringInfoString(&querybuf, " AND ");
757 :
758 88 : leftop = quote_qualified_identifier("newdata",
759 88 : NameStr(attr->attname));
760 88 : rightop = quote_qualified_identifier("mv",
761 88 : NameStr(attr->attname));
762 :
763 88 : generate_operator_clause(&querybuf,
764 : leftop, attrtype,
765 : op,
766 : rightop, attrtype);
767 :
768 88 : foundUniqueIndex = true;
769 : }
770 : }
771 :
772 : /* Keep the locks, since we're about to run DML which needs them. */
773 72 : index_close(indexRel, NoLock);
774 : }
775 :
776 60 : list_free(indexoidlist);
777 :
778 : /*
779 : * There must be at least one usable unique index on the matview.
780 : *
781 : * ExecRefreshMatView() checks that after taking the exclusive lock on the
782 : * matview. So at least one unique index is guaranteed to exist here
783 : * because the lock is still being held; so an Assert seems sufficient.
784 : */
785 : Assert(foundUniqueIndex);
786 :
787 60 : appendStringInfoString(&querybuf,
788 : " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
789 : "WHERE newdata.* IS NULL OR mv.* IS NULL "
790 : "ORDER BY tid");
791 :
792 : /* Create the temporary "diff" table. */
793 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
794 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
795 :
796 60 : SetUserIdAndSecContext(relowner,
797 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
798 :
799 : /*
800 : * We have no further use for data from the "full-data" temp table, but we
801 : * must keep it around because its type is referenced from the diff table.
802 : */
803 :
804 : /* Analyze the diff table. */
805 60 : resetStringInfo(&querybuf);
806 60 : appendStringInfo(&querybuf, "ANALYZE %s", diffname);
807 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
808 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
809 :
810 60 : OpenMatViewIncrementalMaintenance();
811 :
812 : /* Deletes must come before inserts; do them first. */
813 60 : resetStringInfo(&querybuf);
814 60 : appendStringInfo(&querybuf,
815 : "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
816 : "(SELECT diff.tid FROM %s diff "
817 : "WHERE diff.tid IS NOT NULL "
818 : "AND diff.newdata IS NULL)",
819 : matviewname, diffname);
820 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
821 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
822 :
823 : /* Inserts go last. */
824 60 : resetStringInfo(&querybuf);
825 60 : appendStringInfo(&querybuf,
826 : "INSERT INTO %s SELECT (diff.newdata).* "
827 : "FROM %s diff WHERE tid IS NULL",
828 : matviewname, diffname);
829 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
830 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
831 :
832 : /* We're done maintaining the materialized view. */
833 60 : CloseMatViewIncrementalMaintenance();
834 60 : table_close(tempRel, NoLock);
835 60 : table_close(matviewRel, NoLock);
836 :
837 : /* Clean up temp tables. */
838 60 : resetStringInfo(&querybuf);
839 60 : appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
840 60 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
841 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
842 :
843 : /* Close SPI context. */
844 60 : if (SPI_finish() != SPI_OK_FINISH)
845 0 : elog(ERROR, "SPI_finish failed");
846 60 : }
847 :
848 : /*
849 : * Swap the physical files of the target and transient tables, then rebuild
850 : * the target's indexes and throw away the transient table. Security context
851 : * swapping is handled by the called function, so it is not needed here.
852 : */
853 : static void
854 126 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
855 : {
856 126 : finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
857 : RecentXmin, ReadNextMultiXactId(), relpersistence);
858 120 : }
859 :
860 : /*
861 : * Check whether specified index is usable for match merge.
862 : */
863 : static bool
864 150 : is_usable_unique_index(Relation indexRel)
865 : {
866 150 : Form_pg_index indexStruct = indexRel->rd_index;
867 :
868 : /*
869 : * Must be unique, valid, immediate, non-partial, and be defined over
870 : * plain user columns (not expressions). We also require it to be a
871 : * btree. Even if we had any other unique index kinds, we'd not know how
872 : * to identify the corresponding equality operator, nor could we be sure
873 : * that the planner could implement the required FULL JOIN with non-btree
874 : * operators.
875 : */
876 150 : if (indexStruct->indisunique &&
877 150 : indexStruct->indimmediate &&
878 150 : indexRel->rd_rel->relam == BTREE_AM_OID &&
879 300 : indexStruct->indisvalid &&
880 150 : RelationGetIndexPredicate(indexRel) == NIL &&
881 144 : indexStruct->indnatts > 0)
882 : {
883 : /*
884 : * The point of groveling through the index columns individually is to
885 : * reject both index expressions and system columns. Currently,
886 : * matviews couldn't have OID columns so there's no way to create an
887 : * index on a system column; but maybe someday that wouldn't be true,
888 : * so let's be safe.
889 : */
890 144 : int numatts = indexStruct->indnatts;
891 : int i;
892 :
893 314 : for (i = 0; i < numatts; i++)
894 : {
895 176 : int attnum = indexStruct->indkey.values[i];
896 :
897 176 : if (attnum <= 0)
898 6 : return false;
899 : }
900 138 : return true;
901 : }
902 6 : return false;
903 : }
904 :
905 :
906 : /*
907 : * This should be used to test whether the backend is in a context where it is
908 : * OK to allow DML statements to modify materialized views. We only want to
909 : * allow that for internal code driven by the materialized view definition,
910 : * not for arbitrary user-supplied code.
911 : *
912 : * While the function names reflect the fact that their main intended use is
913 : * incremental maintenance of materialized views (in response to changes to
914 : * the data in referenced relations), they are initially used to allow REFRESH
915 : * without blocking concurrent reads.
916 : */
917 : bool
918 120 : MatViewIncrementalMaintenanceIsEnabled(void)
919 : {
920 120 : return matview_maintenance_depth > 0;
921 : }
922 :
923 : static void
924 60 : OpenMatViewIncrementalMaintenance(void)
925 : {
926 60 : matview_maintenance_depth++;
927 60 : }
928 :
929 : static void
930 60 : CloseMatViewIncrementalMaintenance(void)
931 : {
932 60 : matview_maintenance_depth--;
933 : Assert(matview_maintenance_depth >= 0);
934 60 : }
|