Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMemoize.c
4 : * Routines to handle caching of results from parameterized nodes
5 : *
6 : * Portions Copyright (c) 2021-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMemoize.c
12 : *
13 : * Memoize nodes are intended to sit above parameterized nodes in the plan
14 : * tree in order to cache results from them. The intention here is that a
15 : * repeat scan with a parameter value that has already been seen by the node
16 : * can fetch tuples from the cache rather than having to re-scan the outer
17 : * node all over again. The query planner may choose to make use of one of
18 : * these when it thinks rescans for previously seen values are likely enough
19 : * to warrant adding the additional node.
20 : *
21 : * The method of cache we use is a hash table. When the cache fills, we never
22 : * spill tuples to disk, instead, we choose to evict the least recently used
23 : * cache entry from the cache. We remember the least recently used entry by
24 : * always pushing new entries and entries we look for onto the tail of a
25 : * doubly linked list. This means that older items always bubble to the top
26 : * of this LRU list.
27 : *
28 : * Sometimes our callers won't run their scans to completion. For example a
29 : * semi-join only needs to run until it finds a matching tuple, and once it
30 : * does, the join operator skips to the next outer tuple and does not execute
31 : * the inner side again on that scan. Because of this, we must keep track of
32 : * when a cache entry is complete, and by default, we know it is when we run
33 : * out of tuples to read during the scan. However, there are cases where we
34 : * can mark the cache entry as complete without exhausting the scan of all
35 : * tuples. One case is unique joins, where the join operator knows that there
36 : * will only be at most one match for any given outer tuple. In order to
37 : * support such cases we allow the "singlerow" option to be set for the cache.
38 : * This option marks the cache entry as complete after we read the first tuple
39 : * from the subnode.
40 : *
41 : * It's possible when we're filling the cache for a given set of parameters
42 : * that we're unable to free enough memory to store any more tuples. If this
43 : * happens then we'll have already evicted all other cache entries. When
44 : * caching another tuple would cause us to exceed our memory budget, we must
45 : * free the entry that we're currently populating and move the state machine
46 : * into MEMO_CACHE_BYPASS_MODE. This means that we'll not attempt to cache
47 : * any further tuples for this particular scan. We don't have the memory for
48 : * it. The state machine will be reset again on the next rescan. If the
49 : * memory requirements to cache the next parameter's tuples are less
50 : * demanding, then that may allow us to start putting useful entries back into
51 : * the cache again.
52 : *
53 : *
54 : * INTERFACE ROUTINES
55 : * ExecMemoize - lookup cache, exec subplan when not found
56 : * ExecInitMemoize - initialize node and subnodes
57 : * ExecEndMemoize - shutdown node and subnodes
58 : * ExecReScanMemoize - rescan the memoize node
59 : *
60 : * ExecMemoizeEstimate estimates DSM space needed for parallel plan
61 : * ExecMemoizeInitializeDSM initialize DSM for parallel plan
62 : * ExecMemoizeInitializeWorker attach to DSM info in parallel worker
63 : * ExecMemoizeRetrieveInstrumentation get instrumentation from worker
64 : *-------------------------------------------------------------------------
65 : */
66 :
67 : #include "postgres.h"
68 :
69 : #include "common/hashfn.h"
70 : #include "executor/executor.h"
71 : #include "executor/nodeMemoize.h"
72 : #include "lib/ilist.h"
73 : #include "miscadmin.h"
74 : #include "utils/datum.h"
75 : #include "utils/lsyscache.h"
76 :
77 : /* States of the ExecMemoize state machine */
78 : #define MEMO_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */
79 : #define MEMO_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */
80 : #define MEMO_FILLING_CACHE 3 /* Read outer node to fill cache */
81 : #define MEMO_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our
82 : * subplan without caching anything */
83 : #define MEMO_END_OF_SCAN 5 /* Ready for rescan */
84 :
85 :
86 : /* Helper macros for memory accounting */
87 : #define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(MemoizeEntry) + \
88 : sizeof(MemoizeKey) + \
89 : (e)->key->params->t_len);
90 : #define CACHE_TUPLE_BYTES(t) (sizeof(MemoizeTuple) + \
91 : (t)->mintuple->t_len)
92 :
93 : /* MemoizeTuple Stores an individually cached tuple */
94 : typedef struct MemoizeTuple
95 : {
96 : MinimalTuple mintuple; /* Cached tuple */
97 : struct MemoizeTuple *next; /* The next tuple with the same parameter
98 : * values or NULL if it's the last one */
99 : } MemoizeTuple;
100 :
101 : /*
102 : * MemoizeKey
103 : * The hash table key for cached entries plus the LRU list link
104 : */
105 : typedef struct MemoizeKey
106 : {
107 : MinimalTuple params;
108 : dlist_node lru_node; /* Pointer to next/prev key in LRU list */
109 : } MemoizeKey;
110 :
111 : /*
112 : * MemoizeEntry
113 : * The data struct that the cache hash table stores
114 : */
115 : typedef struct MemoizeEntry
116 : {
117 : MemoizeKey *key; /* Hash key for hash table lookups */
118 : MemoizeTuple *tuplehead; /* Pointer to the first tuple or NULL if no
119 : * tuples are cached for this entry */
120 : uint32 hash; /* Hash value (cached) */
121 : char status; /* Hash status */
122 : bool complete; /* Did we read the outer plan to completion? */
123 : } MemoizeEntry;
124 :
125 :
126 : #define SH_PREFIX memoize
127 : #define SH_ELEMENT_TYPE MemoizeEntry
128 : #define SH_KEY_TYPE MemoizeKey *
129 : #define SH_SCOPE static inline
130 : #define SH_DECLARE
131 : #include "lib/simplehash.h"
132 :
133 : static uint32 MemoizeHash_hash(struct memoize_hash *tb,
134 : const MemoizeKey *key);
135 : static bool MemoizeHash_equal(struct memoize_hash *tb,
136 : const MemoizeKey *key1,
137 : const MemoizeKey *key2);
138 :
139 : #define SH_PREFIX memoize
140 : #define SH_ELEMENT_TYPE MemoizeEntry
141 : #define SH_KEY_TYPE MemoizeKey *
142 : #define SH_KEY key
143 : #define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key)
144 : #define SH_EQUAL(tb, a, b) MemoizeHash_equal(tb, a, b)
145 : #define SH_SCOPE static inline
146 : #define SH_STORE_HASH
147 : #define SH_GET_HASH(tb, a) a->hash
148 : #define SH_DEFINE
149 : #include "lib/simplehash.h"
150 :
151 : /*
152 : * MemoizeHash_hash
153 : * Hash function for simplehash hashtable. 'key' is unused here as we
154 : * require that all table lookups first populate the MemoizeState's
155 : * probeslot with the key values to be looked up.
156 : */
157 : static uint32
158 457832 : MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key)
159 : {
160 457832 : MemoizeState *mstate = (MemoizeState *) tb->private_data;
161 457832 : ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
162 : MemoryContext oldcontext;
163 457832 : TupleTableSlot *pslot = mstate->probeslot;
164 457832 : uint32 hashkey = 0;
165 457832 : int numkeys = mstate->nkeys;
166 :
167 457832 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
168 :
169 457832 : if (mstate->binary_mode)
170 : {
171 159346 : for (int i = 0; i < numkeys; i++)
172 : {
173 : /* combine successive hashkeys by rotating */
174 81954 : hashkey = pg_rotate_left32(hashkey, 1);
175 :
176 81954 : if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */
177 : {
178 : FormData_pg_attribute *attr;
179 : uint32 hkey;
180 :
181 81954 : attr = &pslot->tts_tupleDescriptor->attrs[i];
182 :
183 81954 : hkey = datum_image_hash(pslot->tts_values[i], attr->attbyval, attr->attlen);
184 :
185 81954 : hashkey ^= hkey;
186 : }
187 : }
188 : }
189 : else
190 : {
191 380440 : FmgrInfo *hashfunctions = mstate->hashfunctions;
192 380440 : Oid *collations = mstate->collations;
193 :
194 760880 : for (int i = 0; i < numkeys; i++)
195 : {
196 : /* combine successive hashkeys by rotating */
197 380440 : hashkey = pg_rotate_left32(hashkey, 1);
198 :
199 380440 : if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */
200 : {
201 : uint32 hkey;
202 :
203 380066 : hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i],
204 380066 : collations[i], pslot->tts_values[i]));
205 380066 : hashkey ^= hkey;
206 : }
207 : }
208 : }
209 :
210 457832 : ResetExprContext(econtext);
211 457832 : MemoryContextSwitchTo(oldcontext);
212 457832 : return murmurhash32(hashkey);
213 : }
214 :
215 : /*
216 : * MemoizeHash_equal
217 : * Equality function for confirming hash value matches during a hash
218 : * table lookup. 'key2' is never used. Instead the MemoizeState's
219 : * probeslot is always populated with details of what's being looked up.
220 : */
221 : static bool
222 380324 : MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1,
223 : const MemoizeKey *key2)
224 : {
225 380324 : MemoizeState *mstate = (MemoizeState *) tb->private_data;
226 380324 : ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
227 380324 : TupleTableSlot *tslot = mstate->tableslot;
228 380324 : TupleTableSlot *pslot = mstate->probeslot;
229 :
230 : /* probeslot should have already been prepared by prepare_probe_slot() */
231 380324 : ExecStoreMinimalTuple(key1->params, tslot, false);
232 :
233 380324 : if (mstate->binary_mode)
234 : {
235 : MemoryContext oldcontext;
236 76886 : int numkeys = mstate->nkeys;
237 76886 : bool match = true;
238 :
239 76886 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
240 :
241 76886 : slot_getallattrs(tslot);
242 76886 : slot_getallattrs(pslot);
243 :
244 158274 : for (int i = 0; i < numkeys; i++)
245 : {
246 : FormData_pg_attribute *attr;
247 :
248 81388 : if (tslot->tts_isnull[i] != pslot->tts_isnull[i])
249 : {
250 0 : match = false;
251 0 : break;
252 : }
253 :
254 : /* both NULL? they're equal */
255 81388 : if (tslot->tts_isnull[i])
256 0 : continue;
257 :
258 : /* perform binary comparison on the two datums */
259 81388 : attr = &tslot->tts_tupleDescriptor->attrs[i];
260 81388 : if (!datum_image_eq(tslot->tts_values[i], pslot->tts_values[i],
261 81388 : attr->attbyval, attr->attlen))
262 : {
263 0 : match = false;
264 0 : break;
265 : }
266 : }
267 :
268 76886 : ResetExprContext(econtext);
269 76886 : MemoryContextSwitchTo(oldcontext);
270 76886 : return match;
271 : }
272 : else
273 : {
274 303438 : econtext->ecxt_innertuple = tslot;
275 303438 : econtext->ecxt_outertuple = pslot;
276 303438 : return ExecQualAndReset(mstate->cache_eq_expr, econtext);
277 : }
278 : }
279 :
280 : /*
281 : * Initialize the hash table to empty.
282 : */
283 : static void
284 1110 : build_hash_table(MemoizeState *mstate, uint32 size)
285 : {
286 : /* Make a guess at a good size when we're not given a valid size. */
287 1110 : if (size == 0)
288 0 : size = 1024;
289 :
290 : /* memoize_create will convert the size to a power of 2 */
291 1110 : mstate->hashtable = memoize_create(mstate->tableContext, size, mstate);
292 1110 : }
293 :
294 : /*
295 : * prepare_probe_slot
296 : * Populate mstate's probeslot with the values from the tuple stored
297 : * in 'key'. If 'key' is NULL, then perform the population by evaluating
298 : * mstate's param_exprs.
299 : */
300 : static inline void
301 457832 : prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key)
302 : {
303 457832 : TupleTableSlot *pslot = mstate->probeslot;
304 457832 : TupleTableSlot *tslot = mstate->tableslot;
305 457832 : int numKeys = mstate->nkeys;
306 :
307 457832 : ExecClearTuple(pslot);
308 :
309 457832 : if (key == NULL)
310 : {
311 455432 : ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
312 : MemoryContext oldcontext;
313 :
314 455432 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
315 :
316 : /* Set the probeslot's values based on the current parameter values */
317 915426 : for (int i = 0; i < numKeys; i++)
318 459994 : pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i],
319 : econtext,
320 459994 : &pslot->tts_isnull[i]);
321 :
322 455432 : MemoryContextSwitchTo(oldcontext);
323 : }
324 : else
325 : {
326 : /* Process the key's MinimalTuple and store the values in probeslot */
327 2400 : ExecStoreMinimalTuple(key->params, tslot, false);
328 2400 : slot_getallattrs(tslot);
329 2400 : memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys);
330 2400 : memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys);
331 : }
332 :
333 457832 : ExecStoreVirtualTuple(pslot);
334 457832 : }
335 :
336 : /*
337 : * entry_purge_tuples
338 : * Remove all tuples from the cache entry pointed to by 'entry'. This
339 : * leaves an empty cache entry. Also, update the memory accounting to
340 : * reflect the removal of the tuples.
341 : */
342 : static inline void
343 2388 : entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry)
344 : {
345 2388 : MemoizeTuple *tuple = entry->tuplehead;
346 2388 : uint64 freed_mem = 0;
347 :
348 4776 : while (tuple != NULL)
349 : {
350 2388 : MemoizeTuple *next = tuple->next;
351 :
352 2388 : freed_mem += CACHE_TUPLE_BYTES(tuple);
353 :
354 : /* Free memory used for this tuple */
355 2388 : pfree(tuple->mintuple);
356 2388 : pfree(tuple);
357 :
358 2388 : tuple = next;
359 : }
360 :
361 2388 : entry->complete = false;
362 2388 : entry->tuplehead = NULL;
363 :
364 : /* Update the memory accounting */
365 2388 : mstate->mem_used -= freed_mem;
366 2388 : }
367 :
368 : /*
369 : * remove_cache_entry
370 : * Remove 'entry' from the cache and free memory used by it.
371 : */
372 : static void
373 2388 : remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry)
374 : {
375 2388 : MemoizeKey *key = entry->key;
376 :
377 2388 : dlist_delete(&entry->key->lru_node);
378 :
379 : /* Remove all of the tuples from this entry */
380 2388 : entry_purge_tuples(mstate, entry);
381 :
382 : /*
383 : * Update memory accounting. entry_purge_tuples should have already
384 : * subtracted the memory used for each cached tuple. Here we just update
385 : * the amount used by the entry itself.
386 : */
387 2388 : mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry);
388 :
389 : /* Remove the entry from the cache */
390 2388 : memoize_delete_item(mstate->hashtable, entry);
391 :
392 2388 : pfree(key->params);
393 2388 : pfree(key);
394 2388 : }
395 :
396 : /*
397 : * cache_purge_all
398 : * Remove all items from the cache
399 : */
400 : static void
401 18 : cache_purge_all(MemoizeState *mstate)
402 : {
403 18 : uint64 evictions = mstate->hashtable->members;
404 18 : PlanState *pstate = (PlanState *) mstate;
405 :
406 : /*
407 : * Likely the most efficient way to remove all items is to just reset the
408 : * memory context for the cache and then rebuild a fresh hash table. This
409 : * saves having to remove each item one by one and pfree each cached tuple
410 : */
411 18 : MemoryContextReset(mstate->tableContext);
412 :
413 : /* Make the hash table the same size as the original size */
414 18 : build_hash_table(mstate, ((Memoize *) pstate->plan)->est_entries);
415 :
416 : /* reset the LRU list */
417 18 : dlist_init(&mstate->lru_list);
418 18 : mstate->last_tuple = NULL;
419 18 : mstate->entry = NULL;
420 :
421 18 : mstate->mem_used = 0;
422 :
423 : /* XXX should we add something new to track these purges? */
424 18 : mstate->stats.cache_evictions += evictions; /* Update Stats */
425 18 : }
426 :
427 : /*
428 : * cache_reduce_memory
429 : * Evict older and less recently used items from the cache in order to
430 : * reduce the memory consumption back to something below the
431 : * MemoizeState's mem_limit.
432 : *
433 : * 'specialkey', if not NULL, causes the function to return false if the entry
434 : * which the key belongs to is removed from the cache.
435 : */
436 : static bool
437 2388 : cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey)
438 : {
439 2388 : bool specialkey_intact = true; /* for now */
440 : dlist_mutable_iter iter;
441 2388 : uint64 evictions = 0;
442 :
443 : /* Update peak memory usage */
444 2388 : if (mstate->mem_used > mstate->stats.mem_peak)
445 6 : mstate->stats.mem_peak = mstate->mem_used;
446 :
447 : /* We expect only to be called when we've gone over budget on memory */
448 : Assert(mstate->mem_used > mstate->mem_limit);
449 :
450 : /* Start the eviction process starting at the head of the LRU list. */
451 2388 : dlist_foreach_modify(iter, &mstate->lru_list)
452 : {
453 2388 : MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur);
454 : MemoizeEntry *entry;
455 :
456 : /*
457 : * Populate the hash probe slot in preparation for looking up this LRU
458 : * entry.
459 : */
460 2388 : prepare_probe_slot(mstate, key);
461 :
462 : /*
463 : * Ideally the LRU list pointers would be stored in the entry itself
464 : * rather than in the key. Unfortunately, we can't do that as the
465 : * simplehash.h code may resize the table and allocate new memory for
466 : * entries which would result in those pointers pointing to the old
467 : * buckets. However, it's fine to use the key to store this as that's
468 : * only referenced by a pointer in the entry, which of course follows
469 : * the entry whenever the hash table is resized. Since we only have a
470 : * pointer to the key here, we must perform a hash table lookup to
471 : * find the entry that the key belongs to.
472 : */
473 2388 : entry = memoize_lookup(mstate->hashtable, NULL);
474 :
475 : /*
476 : * Sanity check that we found the entry belonging to the LRU list
477 : * item. A misbehaving hash or equality function could cause the
478 : * entry not to be found or the wrong entry to be found.
479 : */
480 2388 : if (unlikely(entry == NULL || entry->key != key))
481 0 : elog(ERROR, "could not find memoization table entry");
482 :
483 : /*
484 : * If we're being called to free memory while the cache is being
485 : * populated with new tuples, then we'd better take some care as we
486 : * could end up freeing the entry which 'specialkey' belongs to.
487 : * Generally callers will pass 'specialkey' as the key for the cache
488 : * entry which is currently being populated, so we must set
489 : * 'specialkey_intact' to false to inform the caller the specialkey
490 : * entry has been removed.
491 : */
492 2388 : if (key == specialkey)
493 0 : specialkey_intact = false;
494 :
495 : /*
496 : * Finally remove the entry. This will remove from the LRU list too.
497 : */
498 2388 : remove_cache_entry(mstate, entry);
499 :
500 2388 : evictions++;
501 :
502 : /* Exit if we've freed enough memory */
503 2388 : if (mstate->mem_used <= mstate->mem_limit)
504 2388 : break;
505 : }
506 :
507 2388 : mstate->stats.cache_evictions += evictions; /* Update Stats */
508 :
509 2388 : return specialkey_intact;
510 : }
511 :
512 : /*
513 : * cache_lookup
514 : * Perform a lookup to see if we've already cached tuples based on the
515 : * scan's current parameters. If we find an existing entry we move it to
516 : * the end of the LRU list, set *found to true then return it. If we
517 : * don't find an entry then we create a new one and add it to the end of
518 : * the LRU list. We also update cache memory accounting and remove older
519 : * entries if we go over the memory budget. If we managed to free enough
520 : * memory we return the new entry, else we return NULL.
521 : *
522 : * Callers can assume we'll never return NULL when *found is true.
523 : */
524 : static MemoizeEntry *
525 455432 : cache_lookup(MemoizeState *mstate, bool *found)
526 : {
527 : MemoizeKey *key;
528 : MemoizeEntry *entry;
529 : MemoryContext oldcontext;
530 :
531 : /* prepare the probe slot with the current scan parameters */
532 455432 : prepare_probe_slot(mstate, NULL);
533 :
534 : /*
535 : * Add the new entry to the cache. No need to pass a valid key since the
536 : * hash function uses mstate's probeslot, which we populated above.
537 : */
538 455432 : entry = memoize_insert(mstate->hashtable, NULL, found);
539 :
540 455432 : if (*found)
541 : {
542 : /*
543 : * Move existing entry to the tail of the LRU list to mark it as the
544 : * most recently used item.
545 : */
546 377924 : dlist_move_tail(&mstate->lru_list, &entry->key->lru_node);
547 :
548 377924 : return entry;
549 : }
550 :
551 77508 : oldcontext = MemoryContextSwitchTo(mstate->tableContext);
552 :
553 : /* Allocate a new key */
554 77508 : entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey));
555 77508 : key->params = ExecCopySlotMinimalTuple(mstate->probeslot);
556 :
557 : /* Update the total cache memory utilization */
558 77508 : mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry);
559 :
560 : /* Initialize this entry */
561 77508 : entry->complete = false;
562 77508 : entry->tuplehead = NULL;
563 :
564 : /*
565 : * Since this is the most recently used entry, push this entry onto the
566 : * end of the LRU list.
567 : */
568 77508 : dlist_push_tail(&mstate->lru_list, &entry->key->lru_node);
569 :
570 77508 : mstate->last_tuple = NULL;
571 :
572 77508 : MemoryContextSwitchTo(oldcontext);
573 :
574 : /*
575 : * If we've gone over our memory budget, then we'll free up some space in
576 : * the cache.
577 : */
578 77508 : if (mstate->mem_used > mstate->mem_limit)
579 : {
580 : /*
581 : * Try to free up some memory. It's highly unlikely that we'll fail
582 : * to do so here since the entry we've just added is yet to contain
583 : * any tuples and we're able to remove any other entry to reduce the
584 : * memory consumption.
585 : */
586 2388 : if (unlikely(!cache_reduce_memory(mstate, key)))
587 0 : return NULL;
588 :
589 : /*
590 : * The process of removing entries from the cache may have caused the
591 : * code in simplehash.h to shuffle elements to earlier buckets in the
592 : * hash table. If it has, we'll need to find the entry again by
593 : * performing a lookup. Fortunately, we can detect if this has
594 : * happened by seeing if the entry is still in use and that the key
595 : * pointer matches our expected key.
596 : */
597 2388 : if (entry->status != memoize_SH_IN_USE || entry->key != key)
598 : {
599 : /*
600 : * We need to repopulate the probeslot as lookups performed during
601 : * the cache evictions above will have stored some other key.
602 : */
603 12 : prepare_probe_slot(mstate, key);
604 :
605 : /* Re-find the newly added entry */
606 12 : entry = memoize_lookup(mstate->hashtable, NULL);
607 : Assert(entry != NULL);
608 : }
609 : }
610 :
611 77508 : return entry;
612 : }
613 :
614 : /*
615 : * cache_store_tuple
616 : * Add the tuple stored in 'slot' to the mstate's current cache entry.
617 : * The cache entry must have already been made with cache_lookup().
618 : * mstate's last_tuple field must point to the tail of mstate->entry's
619 : * list of tuples.
620 : */
621 : static bool
622 70990 : cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot)
623 : {
624 : MemoizeTuple *tuple;
625 70990 : MemoizeEntry *entry = mstate->entry;
626 : MemoryContext oldcontext;
627 :
628 : Assert(slot != NULL);
629 : Assert(entry != NULL);
630 :
631 70990 : oldcontext = MemoryContextSwitchTo(mstate->tableContext);
632 :
633 70990 : tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple));
634 70990 : tuple->mintuple = ExecCopySlotMinimalTuple(slot);
635 70990 : tuple->next = NULL;
636 :
637 : /* Account for the memory we just consumed */
638 70990 : mstate->mem_used += CACHE_TUPLE_BYTES(tuple);
639 :
640 70990 : if (entry->tuplehead == NULL)
641 : {
642 : /*
643 : * This is the first tuple for this entry, so just point the list head
644 : * to it.
645 : */
646 70676 : entry->tuplehead = tuple;
647 : }
648 : else
649 : {
650 : /* push this tuple onto the tail of the list */
651 314 : mstate->last_tuple->next = tuple;
652 : }
653 :
654 70990 : mstate->last_tuple = tuple;
655 70990 : MemoryContextSwitchTo(oldcontext);
656 :
657 : /*
658 : * If we've gone over our memory budget then free up some space in the
659 : * cache.
660 : */
661 70990 : if (mstate->mem_used > mstate->mem_limit)
662 : {
663 0 : MemoizeKey *key = entry->key;
664 :
665 0 : if (!cache_reduce_memory(mstate, key))
666 0 : return false;
667 :
668 : /*
669 : * The process of removing entries from the cache may have caused the
670 : * code in simplehash.h to shuffle elements to earlier buckets in the
671 : * hash table. If it has, we'll need to find the entry again by
672 : * performing a lookup. Fortunately, we can detect if this has
673 : * happened by seeing if the entry is still in use and that the key
674 : * pointer matches our expected key.
675 : */
676 0 : if (entry->status != memoize_SH_IN_USE || entry->key != key)
677 : {
678 : /*
679 : * We need to repopulate the probeslot as lookups performed during
680 : * the cache evictions above will have stored some other key.
681 : */
682 0 : prepare_probe_slot(mstate, key);
683 :
684 : /* Re-find the entry */
685 0 : mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL);
686 : Assert(entry != NULL);
687 : }
688 : }
689 :
690 70990 : return true;
691 : }
692 :
693 : static TupleTableSlot *
694 588850 : ExecMemoize(PlanState *pstate)
695 : {
696 588850 : MemoizeState *node = castNode(MemoizeState, pstate);
697 : PlanState *outerNode;
698 : TupleTableSlot *slot;
699 :
700 588850 : switch (node->mstatus)
701 : {
702 455432 : case MEMO_CACHE_LOOKUP:
703 : {
704 : MemoizeEntry *entry;
705 : TupleTableSlot *outerslot;
706 : bool found;
707 :
708 : Assert(node->entry == NULL);
709 :
710 : /*
711 : * We're only ever in this state for the first call of the
712 : * scan. Here we have a look to see if we've already seen the
713 : * current parameters before and if we have already cached a
714 : * complete set of records that the outer plan will return for
715 : * these parameters.
716 : *
717 : * When we find a valid cache entry, we'll return the first
718 : * tuple from it. If not found, we'll create a cache entry and
719 : * then try to fetch a tuple from the outer scan. If we find
720 : * one there, we'll try to cache it.
721 : */
722 :
723 : /* see if we've got anything cached for the current parameters */
724 455432 : entry = cache_lookup(node, &found);
725 :
726 455432 : if (found && entry->complete)
727 : {
728 377924 : node->stats.cache_hits += 1; /* stats update */
729 :
730 : /*
731 : * Set last_tuple and entry so that the state
732 : * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next
733 : * tuple for these parameters.
734 : */
735 377924 : node->last_tuple = entry->tuplehead;
736 377924 : node->entry = entry;
737 :
738 : /* Fetch the first cached tuple, if there is one */
739 377924 : if (entry->tuplehead)
740 : {
741 105852 : node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE;
742 :
743 105852 : slot = node->ss.ps.ps_ResultTupleSlot;
744 105852 : ExecStoreMinimalTuple(entry->tuplehead->mintuple,
745 : slot, false);
746 :
747 105852 : return slot;
748 : }
749 :
750 : /* The cache entry is void of any tuples. */
751 272072 : node->mstatus = MEMO_END_OF_SCAN;
752 272072 : return NULL;
753 : }
754 :
755 : /* Handle cache miss */
756 77508 : node->stats.cache_misses += 1; /* stats update */
757 :
758 77508 : if (found)
759 : {
760 : /*
761 : * A cache entry was found, but the scan for that entry
762 : * did not run to completion. We'll just remove all
763 : * tuples and start again. It might be tempting to
764 : * continue where we left off, but there's no guarantee
765 : * the outer node will produce the tuples in the same
766 : * order as it did last time.
767 : */
768 0 : entry_purge_tuples(node, entry);
769 : }
770 :
771 : /* Scan the outer node for a tuple to cache */
772 77508 : outerNode = outerPlanState(node);
773 77508 : outerslot = ExecProcNode(outerNode);
774 77508 : if (TupIsNull(outerslot))
775 : {
776 : /*
777 : * cache_lookup may have returned NULL due to failure to
778 : * free enough cache space, so ensure we don't do anything
779 : * here that assumes it worked. There's no need to go into
780 : * bypass mode here as we're setting mstatus to end of
781 : * scan.
782 : */
783 6832 : if (likely(entry))
784 6832 : entry->complete = true;
785 :
786 6832 : node->mstatus = MEMO_END_OF_SCAN;
787 6832 : return NULL;
788 : }
789 :
790 70676 : node->entry = entry;
791 :
792 : /*
793 : * If we failed to create the entry or failed to store the
794 : * tuple in the entry, then go into bypass mode.
795 : */
796 70676 : if (unlikely(entry == NULL ||
797 : !cache_store_tuple(node, outerslot)))
798 : {
799 0 : node->stats.cache_overflows += 1; /* stats update */
800 :
801 0 : node->mstatus = MEMO_CACHE_BYPASS_MODE;
802 :
803 : /*
804 : * No need to clear out last_tuple as we'll stay in bypass
805 : * mode until the end of the scan.
806 : */
807 : }
808 : else
809 : {
810 : /*
811 : * If we only expect a single row from this scan then we
812 : * can mark that we're not expecting more. This allows
813 : * cache lookups to work even when the scan has not been
814 : * executed to completion.
815 : */
816 70676 : entry->complete = node->singlerow;
817 70676 : node->mstatus = MEMO_FILLING_CACHE;
818 : }
819 :
820 70676 : slot = node->ss.ps.ps_ResultTupleSlot;
821 70676 : ExecCopySlot(slot, outerslot);
822 70676 : return slot;
823 : }
824 :
825 66108 : case MEMO_CACHE_FETCH_NEXT_TUPLE:
826 : {
827 : /* We shouldn't be in this state if these are not set */
828 : Assert(node->entry != NULL);
829 : Assert(node->last_tuple != NULL);
830 :
831 : /* Skip to the next tuple to output */
832 66108 : node->last_tuple = node->last_tuple->next;
833 :
834 : /* No more tuples in the cache */
835 66108 : if (node->last_tuple == NULL)
836 : {
837 61128 : node->mstatus = MEMO_END_OF_SCAN;
838 61128 : return NULL;
839 : }
840 :
841 4980 : slot = node->ss.ps.ps_ResultTupleSlot;
842 4980 : ExecStoreMinimalTuple(node->last_tuple->mintuple, slot,
843 : false);
844 :
845 4980 : return slot;
846 : }
847 :
848 67310 : case MEMO_FILLING_CACHE:
849 : {
850 : TupleTableSlot *outerslot;
851 67310 : MemoizeEntry *entry = node->entry;
852 :
853 : /* entry should already have been set by MEMO_CACHE_LOOKUP */
854 : Assert(entry != NULL);
855 :
856 : /*
857 : * When in the MEMO_FILLING_CACHE state, we've just had a
858 : * cache miss and are populating the cache with the current
859 : * scan tuples.
860 : */
861 67310 : outerNode = outerPlanState(node);
862 67310 : outerslot = ExecProcNode(outerNode);
863 67310 : if (TupIsNull(outerslot))
864 : {
865 : /* No more tuples. Mark it as complete */
866 66996 : entry->complete = true;
867 66996 : node->mstatus = MEMO_END_OF_SCAN;
868 66996 : return NULL;
869 : }
870 :
871 : /*
872 : * Validate if the planner properly set the singlerow flag. It
873 : * should only set that if each cache entry can, at most,
874 : * return 1 row.
875 : */
876 314 : if (unlikely(entry->complete))
877 0 : elog(ERROR, "cache entry already complete");
878 :
879 : /* Record the tuple in the current cache entry */
880 314 : if (unlikely(!cache_store_tuple(node, outerslot)))
881 : {
882 : /* Couldn't store it? Handle overflow */
883 0 : node->stats.cache_overflows += 1; /* stats update */
884 :
885 0 : node->mstatus = MEMO_CACHE_BYPASS_MODE;
886 :
887 : /*
888 : * No need to clear out entry or last_tuple as we'll stay
889 : * in bypass mode until the end of the scan.
890 : */
891 : }
892 :
893 314 : slot = node->ss.ps.ps_ResultTupleSlot;
894 314 : ExecCopySlot(slot, outerslot);
895 314 : return slot;
896 : }
897 :
898 0 : case MEMO_CACHE_BYPASS_MODE:
899 : {
900 : TupleTableSlot *outerslot;
901 :
902 : /*
903 : * When in bypass mode we just continue to read tuples without
904 : * caching. We need to wait until the next rescan before we
905 : * can come out of this mode.
906 : */
907 0 : outerNode = outerPlanState(node);
908 0 : outerslot = ExecProcNode(outerNode);
909 0 : if (TupIsNull(outerslot))
910 : {
911 0 : node->mstatus = MEMO_END_OF_SCAN;
912 0 : return NULL;
913 : }
914 :
915 0 : slot = node->ss.ps.ps_ResultTupleSlot;
916 0 : ExecCopySlot(slot, outerslot);
917 0 : return slot;
918 : }
919 :
920 0 : case MEMO_END_OF_SCAN:
921 :
922 : /*
923 : * We've already returned NULL for this scan, but just in case
924 : * something calls us again by mistake.
925 : */
926 0 : return NULL;
927 :
928 0 : default:
929 0 : elog(ERROR, "unrecognized memoize state: %d",
930 : (int) node->mstatus);
931 : return NULL;
932 : } /* switch */
933 : }
934 :
935 : MemoizeState *
936 1092 : ExecInitMemoize(Memoize *node, EState *estate, int eflags)
937 : {
938 1092 : MemoizeState *mstate = makeNode(MemoizeState);
939 : Plan *outerNode;
940 : int i;
941 : int nkeys;
942 : Oid *eqfuncoids;
943 :
944 : /* check for unsupported flags */
945 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
946 :
947 1092 : mstate->ss.ps.plan = (Plan *) node;
948 1092 : mstate->ss.ps.state = estate;
949 1092 : mstate->ss.ps.ExecProcNode = ExecMemoize;
950 :
951 : /*
952 : * Miscellaneous initialization
953 : *
954 : * create expression context for node
955 : */
956 1092 : ExecAssignExprContext(estate, &mstate->ss.ps);
957 :
958 1092 : outerNode = outerPlan(node);
959 1092 : outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags);
960 :
961 : /*
962 : * Initialize return slot and type. No need to initialize projection info
963 : * because this node doesn't do projections.
964 : */
965 1092 : ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple);
966 1092 : mstate->ss.ps.ps_ProjInfo = NULL;
967 :
968 : /*
969 : * Initialize scan slot and type.
970 : */
971 1092 : ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple);
972 :
973 : /*
974 : * Set the state machine to lookup the cache. We won't find anything
975 : * until we cache something, but this saves a special case to create the
976 : * first entry.
977 : */
978 1092 : mstate->mstatus = MEMO_CACHE_LOOKUP;
979 :
980 1092 : mstate->nkeys = nkeys = node->numKeys;
981 1092 : mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs);
982 1092 : mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
983 : &TTSOpsMinimalTuple);
984 1092 : mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
985 : &TTSOpsVirtual);
986 :
987 1092 : mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *));
988 1092 : mstate->collations = node->collations; /* Just point directly to the plan
989 : * data */
990 1092 : mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
991 :
992 1092 : eqfuncoids = palloc(nkeys * sizeof(Oid));
993 :
994 2202 : for (i = 0; i < nkeys; i++)
995 : {
996 1110 : Oid hashop = node->hashOperators[i];
997 : Oid left_hashfn;
998 : Oid right_hashfn;
999 1110 : Expr *param_expr = (Expr *) list_nth(node->param_exprs, i);
1000 :
1001 1110 : if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
1002 0 : elog(ERROR, "could not find hash function for hash operator %u",
1003 : hashop);
1004 :
1005 1110 : fmgr_info(left_hashfn, &mstate->hashfunctions[i]);
1006 :
1007 1110 : mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate);
1008 1110 : eqfuncoids[i] = get_opcode(hashop);
1009 : }
1010 :
1011 2184 : mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc,
1012 : &TTSOpsMinimalTuple,
1013 : &TTSOpsVirtual,
1014 : eqfuncoids,
1015 1092 : node->collations,
1016 1092 : node->param_exprs,
1017 : (PlanState *) mstate);
1018 :
1019 1092 : pfree(eqfuncoids);
1020 1092 : mstate->mem_used = 0;
1021 :
1022 : /* Limit the total memory consumed by the cache to this */
1023 1092 : mstate->mem_limit = get_hash_memory_limit();
1024 :
1025 : /* A memory context dedicated for the cache */
1026 1092 : mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext,
1027 : "MemoizeHashTable",
1028 : ALLOCSET_DEFAULT_SIZES);
1029 :
1030 1092 : dlist_init(&mstate->lru_list);
1031 1092 : mstate->last_tuple = NULL;
1032 1092 : mstate->entry = NULL;
1033 :
1034 : /*
1035 : * Mark if we can assume the cache entry is completed after we get the
1036 : * first record for it. Some callers might not call us again after
1037 : * getting the first match. e.g. A join operator performing a unique join
1038 : * is able to skip to the next outer tuple after getting the first
1039 : * matching inner tuple. In this case, the cache entry is complete after
1040 : * getting the first tuple. This allows us to mark it as so.
1041 : */
1042 1092 : mstate->singlerow = node->singlerow;
1043 1092 : mstate->keyparamids = node->keyparamids;
1044 :
1045 : /*
1046 : * Record if the cache keys should be compared bit by bit, or logically
1047 : * using the type's hash equality operator
1048 : */
1049 1092 : mstate->binary_mode = node->binary_mode;
1050 :
1051 : /* Zero the statistics counters */
1052 1092 : memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation));
1053 :
1054 : /* Allocate and set up the actual cache */
1055 1092 : build_hash_table(mstate, node->est_entries);
1056 :
1057 1092 : return mstate;
1058 : }
1059 :
1060 : void
1061 1092 : ExecEndMemoize(MemoizeState *node)
1062 : {
1063 : #ifdef USE_ASSERT_CHECKING
1064 : /* Validate the memory accounting code is correct in assert builds. */
1065 : {
1066 : int count;
1067 : uint64 mem = 0;
1068 : memoize_iterator i;
1069 : MemoizeEntry *entry;
1070 :
1071 : memoize_start_iterate(node->hashtable, &i);
1072 :
1073 : count = 0;
1074 : while ((entry = memoize_iterate(node->hashtable, &i)) != NULL)
1075 : {
1076 : MemoizeTuple *tuple = entry->tuplehead;
1077 :
1078 : mem += EMPTY_ENTRY_MEMORY_BYTES(entry);
1079 : while (tuple != NULL)
1080 : {
1081 : mem += CACHE_TUPLE_BYTES(tuple);
1082 : tuple = tuple->next;
1083 : }
1084 : count++;
1085 : }
1086 :
1087 : Assert(count == node->hashtable->members);
1088 : Assert(mem == node->mem_used);
1089 : }
1090 : #endif
1091 :
1092 : /*
1093 : * When ending a parallel worker, copy the statistics gathered by the
1094 : * worker back into shared memory so that it can be picked up by the main
1095 : * process to report in EXPLAIN ANALYZE.
1096 : */
1097 1092 : if (node->shared_info != NULL && IsParallelWorker())
1098 : {
1099 : MemoizeInstrumentation *si;
1100 :
1101 : /* Make mem_peak available for EXPLAIN */
1102 0 : if (node->stats.mem_peak == 0)
1103 0 : node->stats.mem_peak = node->mem_used;
1104 :
1105 : Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
1106 0 : si = &node->shared_info->sinstrument[ParallelWorkerNumber];
1107 0 : memcpy(si, &node->stats, sizeof(MemoizeInstrumentation));
1108 : }
1109 :
1110 : /* Remove the cache context */
1111 1092 : MemoryContextDelete(node->tableContext);
1112 :
1113 : /*
1114 : * shut down the subplan
1115 : */
1116 1092 : ExecEndNode(outerPlanState(node));
1117 1092 : }
1118 :
1119 : void
1120 455432 : ExecReScanMemoize(MemoizeState *node)
1121 : {
1122 455432 : PlanState *outerPlan = outerPlanState(node);
1123 :
1124 : /* Mark that we must lookup the cache for a new set of parameters */
1125 455432 : node->mstatus = MEMO_CACHE_LOOKUP;
1126 :
1127 : /* nullify pointers used for the last scan */
1128 455432 : node->entry = NULL;
1129 455432 : node->last_tuple = NULL;
1130 :
1131 : /*
1132 : * if chgParam of subnode is not null then plan will be re-scanned by
1133 : * first ExecProcNode.
1134 : */
1135 455432 : if (outerPlan->chgParam == NULL)
1136 0 : ExecReScan(outerPlan);
1137 :
1138 : /*
1139 : * Purge the entire cache if a parameter changed that is not part of the
1140 : * cache key.
1141 : */
1142 455432 : if (bms_nonempty_difference(outerPlan->chgParam, node->keyparamids))
1143 18 : cache_purge_all(node);
1144 455432 : }
1145 :
1146 : /*
1147 : * ExecEstimateCacheEntryOverheadBytes
1148 : * For use in the query planner to help it estimate the amount of memory
1149 : * required to store a single entry in the cache.
1150 : */
1151 : double
1152 203206 : ExecEstimateCacheEntryOverheadBytes(double ntuples)
1153 : {
1154 203206 : return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) *
1155 : ntuples;
1156 : }
1157 :
1158 : /* ----------------------------------------------------------------
1159 : * Parallel Query Support
1160 : * ----------------------------------------------------------------
1161 : */
1162 :
1163 : /* ----------------------------------------------------------------
1164 : * ExecMemoizeEstimate
1165 : *
1166 : * Estimate space required to propagate memoize statistics.
1167 : * ----------------------------------------------------------------
1168 : */
1169 : void
1170 6 : ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
1171 : {
1172 : Size size;
1173 :
1174 : /* don't need this if not instrumenting or no workers */
1175 6 : if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1176 6 : return;
1177 :
1178 0 : size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation));
1179 0 : size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument));
1180 0 : shm_toc_estimate_chunk(&pcxt->estimator, size);
1181 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1182 : }
1183 :
1184 : /* ----------------------------------------------------------------
1185 : * ExecMemoizeInitializeDSM
1186 : *
1187 : * Initialize DSM space for memoize statistics.
1188 : * ----------------------------------------------------------------
1189 : */
1190 : void
1191 6 : ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
1192 : {
1193 : Size size;
1194 :
1195 : /* don't need this if not instrumenting or no workers */
1196 6 : if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1197 6 : return;
1198 :
1199 0 : size = offsetof(SharedMemoizeInfo, sinstrument)
1200 0 : + pcxt->nworkers * sizeof(MemoizeInstrumentation);
1201 0 : node->shared_info = shm_toc_allocate(pcxt->toc, size);
1202 : /* ensure any unfilled slots will contain zeroes */
1203 0 : memset(node->shared_info, 0, size);
1204 0 : node->shared_info->num_workers = pcxt->nworkers;
1205 0 : shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1206 0 : node->shared_info);
1207 : }
1208 :
1209 : /* ----------------------------------------------------------------
1210 : * ExecMemoizeInitializeWorker
1211 : *
1212 : * Attach worker to DSM space for memoize statistics.
1213 : * ----------------------------------------------------------------
1214 : */
1215 : void
1216 12 : ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
1217 : {
1218 12 : node->shared_info =
1219 12 : shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1220 12 : }
1221 :
1222 : /* ----------------------------------------------------------------
1223 : * ExecMemoizeRetrieveInstrumentation
1224 : *
1225 : * Transfer memoize statistics from DSM to private memory.
1226 : * ----------------------------------------------------------------
1227 : */
1228 : void
1229 0 : ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
1230 : {
1231 : Size size;
1232 : SharedMemoizeInfo *si;
1233 :
1234 0 : if (node->shared_info == NULL)
1235 0 : return;
1236 :
1237 0 : size = offsetof(SharedMemoizeInfo, sinstrument)
1238 0 : + node->shared_info->num_workers * sizeof(MemoizeInstrumentation);
1239 0 : si = palloc(size);
1240 0 : memcpy(si, node->shared_info, size);
1241 0 : node->shared_info = si;
1242 : }
|