Skip to content

Package: TtlCacheManager$CacheSweeper

TtlCacheManager$CacheSweeper

nameinstructionbranchcomplexitylinemethod
TtlCacheManager.CacheSweeper(TtlCacheManager)
M: 0 C: 6
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
run()
M: 1 C: 68
99%
M: 1 C: 5
83%
M: 1 C: 3
75%
M: 1 C: 15
94%
M: 0 C: 1
100%

Coverage

1: /**
2: * Copyright (C) 2020 Czech Technical University in Prague
3: *
4: * This program is free software: you can redistribute it and/or modify it under
5: * the terms of the GNU General Public License as published by the Free Software
6: * Foundation, either version 3 of the License, or (at your option) any
7: * later version.
8: *
9: * This program is distributed in the hope that it will be useful, but WITHOUT
10: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11: * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12: * details. You should have received a copy of the GNU General Public License
13: * along with this program. If not, see <http://www.gnu.org/licenses/>.
14: */
15: package cz.cvut.kbss.jopa.sessions.cache;
16:
17: import cz.cvut.kbss.jopa.model.JOPAPersistenceProperties;
18: import cz.cvut.kbss.jopa.model.descriptors.Descriptor;
19: import cz.cvut.kbss.jopa.sessions.CacheManager;
20: import cz.cvut.kbss.jopa.utils.ErrorUtils;
21: import org.slf4j.Logger;
22: import org.slf4j.LoggerFactory;
23:
24: import java.net.URI;
25: import java.util.*;
26: import java.util.Map.Entry;
27: import java.util.concurrent.Executors;
28: import java.util.concurrent.Future;
29: import java.util.concurrent.ScheduledExecutorService;
30: import java.util.concurrent.TimeUnit;
31: import java.util.concurrent.locks.Lock;
32: import java.util.concurrent.locks.ReentrantReadWriteLock;
33:
34: /**
35: * Manages the second level cache shared by all persistence contexts.
36: * <p>
37: * This implementation of CacheManager uses cache-wide locking, i. e. the whole cache is locked when an entity is being
38: * put in it, no matter that only one context is affected by the change.
39: * <p>
40: * This cache is swept regularly by a dedicated thread, which removes all entries whose time-to-live (TTL) has
41: * expired.
42: */
43: public class TtlCacheManager implements CacheManager {
44:
45: private static final Logger LOG = LoggerFactory.getLogger(TtlCacheManager.class);
46:
47: /**
48: * Default time to live in millis
49: */
50: private static final long DEFAULT_TTL = 60000L;
51: // Initial delay is the sweep rate multiplied by this multiplier
52: private static final int DELAY_MULTIPLIER = 2;
53: /**
54: * Default sweep rate in millis
55: */
56: private static final long DEFAULT_SWEEP_RATE = 30000L;
57:
58: private Set<Class<?>> inferredClasses;
59:
60: private TtlCache cache;
61:
62: // Each repository can have its own lock and they could be acquired by this
63: // instance itself, no need to pass this burden to callers
64: private final Lock readLock;
65: private final Lock writeLock;
66:
67: private final ScheduledExecutorService sweeperScheduler;
68: private Future<?> sweeperFuture;
69: private long initDelay;
70: private long sweepRate;
71: private long timeToLive;
72: private volatile boolean sweepRunning;
73:
74: public TtlCacheManager(Map<String, String> properties) {
75: this.cache = new TtlCache();
76: initSettings(properties);
77: final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
78: this.readLock = lock.readLock();
79: this.writeLock = lock.writeLock();
80: this.sweeperScheduler = Executors.newSingleThreadScheduledExecutor();
81: this.sweeperFuture = sweeperScheduler.scheduleAtFixedRate(new CacheSweeper(), initDelay, sweepRate,
82: TimeUnit.MILLISECONDS);
83: }
84:
85: private void initSettings(Map<String, String> properties) {
86: if (!properties.containsKey(JOPAPersistenceProperties.CACHE_TTL)) {
87: this.timeToLive = DEFAULT_TTL;
88: } else {
89: final String strCacheTtl = properties.get(JOPAPersistenceProperties.CACHE_TTL);
90: try {
91: // The property is in seconds, we need milliseconds
92: this.timeToLive = Long.parseLong(strCacheTtl) * 1000;
93: } catch (NumberFormatException e) {
94: LOG.warn("Unable to parse cache time to live setting value {}, using default value.", strCacheTtl);
95: this.timeToLive = DEFAULT_TTL;
96: }
97: }
98: if (!properties.containsKey(JOPAPersistenceProperties.CACHE_SWEEP_RATE)) {
99: this.sweepRate = DEFAULT_SWEEP_RATE;
100: } else {
101: final String strSweepRate = properties
102: .get(JOPAPersistenceProperties.CACHE_SWEEP_RATE);
103: try {
104: // The property is in seconds, we need milliseconds
105: this.sweepRate = Long.parseLong(strSweepRate) * 1000;
106: } catch (NumberFormatException e) {
107: LOG.warn("Unable to parse sweep rate setting value {}, using default value.", strSweepRate);
108: this.sweepRate = DEFAULT_SWEEP_RATE;
109: }
110: }
111: this.initDelay = DELAY_MULTIPLIER * sweepRate;
112: }
113:
114: @Override
115: public void close() {
116: if (sweeperFuture != null) {
117: LOG.debug("Stopping cache sweeper.");
118: sweeperFuture.cancel(true);
119: sweeperScheduler.shutdown();
120: }
121: this.sweeperFuture = null;
122: }
123:
124: @Override
125: public void add(Object primaryKey, Object entity, Descriptor descriptor) {
126: Objects.requireNonNull(primaryKey, ErrorUtils.getNPXMessageSupplier("primaryKey"));
127: Objects.requireNonNull(entity, ErrorUtils.getNPXMessageSupplier("entity"));
128: Objects.requireNonNull(descriptor, ErrorUtils.getNPXMessageSupplier("descriptor"));
129:
130: acquireWriteLock();
131: try {
132: cache.put(primaryKey, entity, descriptor);
133: } finally {
134: releaseWriteLock();
135: }
136: }
137:
138: /**
139: * Releases the live object cache.
140: */
141: private void releaseCache() {
142: acquireWriteLock();
143: try {
144: this.cache = new TtlCache();
145: } finally {
146: releaseWriteLock();
147: }
148: }
149:
150: @Override
151: public void evictInferredObjects() {
152: acquireWriteLock();
153: try {
154: getInferredClasses().forEach(cache::evict);
155: } finally {
156: releaseWriteLock();
157: }
158: }
159:
160: @Override
161: public <T> T get(Class<T> cls, Object primaryKey, Descriptor descriptor) {
162: if (cls == null || primaryKey == null || descriptor == null) {
163: return null;
164: }
165: acquireReadLock();
166: try {
167: return cache.get(cls, primaryKey, descriptor);
168: } finally {
169: releaseReadLock();
170: }
171: }
172:
173: /**
174: * Get the set of inferred classes.
175: * <p>
176: * Inferred classes (i. e. classes with inferred attributes) are tracked separately since they require special
177: * behavior.
178: *
179: * @return Set of inferred classes
180: */
181: public Set<Class<?>> getInferredClasses() {
182: if (inferredClasses == null) {
183: this.inferredClasses = new HashSet<>();
184: }
185: return inferredClasses;
186: }
187:
188: /**
189: * Set the inferred classes.
190: * <p>
191: * For more information about inferred classes see {@link #getInferredClasses()}.
192: *
193: * @param inferredClasses The set of inferred classes
194: */
195: @Override
196: public void setInferredClasses(Set<Class<?>> inferredClasses) {
197: this.inferredClasses = inferredClasses;
198: }
199:
200: @Override
201: public boolean contains(Class<?> cls, Object primaryKey, Descriptor descriptor) {
202: if (cls == null || primaryKey == null || descriptor == null) {
203: return false;
204: }
205: acquireReadLock();
206: try {
207: return cache.contains(cls, primaryKey, descriptor);
208: } finally {
209: releaseReadLock();
210: }
211: }
212:
213: @Override
214: public void evict(Class<?> cls) {
215: Objects.requireNonNull(cls);
216:
217: acquireWriteLock();
218: try {
219: cache.evict(cls);
220: } finally {
221: releaseWriteLock();
222: }
223: }
224:
225: @Override
226: public void evict(Class<?> cls, Object identifier, URI context) {
227: Objects.requireNonNull(cls, ErrorUtils.getNPXMessageSupplier("cls"));
228: Objects.requireNonNull(identifier, ErrorUtils.getNPXMessageSupplier("primaryKey"));
229:
230: acquireWriteLock();
231: try {
232: cache.evict(cls, identifier, context);
233: } finally {
234: releaseWriteLock();
235: }
236:
237: }
238:
239: @Override
240: public void evict(URI context) {
241: acquireWriteLock();
242: try {
243: cache.evict(context);
244: } finally {
245: releaseWriteLock();
246: }
247: }
248:
249: @Override
250: public void evictAll() {
251: releaseCache();
252: }
253:
254: private void acquireReadLock() {
255: readLock.lock();
256: }
257:
258: private void releaseReadLock() {
259: readLock.unlock();
260: }
261:
262: private void acquireWriteLock() {
263: writeLock.lock();
264: }
265:
266: private void releaseWriteLock() {
267: writeLock.unlock();
268: }
269:
270: /**
271: * Sweeps the second level cache and removes entities with no more time to live.
272: */
273: private final class CacheSweeper implements Runnable {
274:
275: @Override
276: public void run() {
277: LOG.trace("Running cache sweep.");
278: TtlCacheManager.this.acquireWriteLock();
279: try {
280:• if (TtlCacheManager.this.sweepRunning) {
281: return;
282: }
283: TtlCacheManager.this.sweepRunning = true;
284: final long currentTime = System.currentTimeMillis();
285: final List<URI> toEvict = new ArrayList<>();
286: // Mark the objects for eviction (can't evict them now, it would
287: // cause ConcurrentModificationException)
288:• for (Entry<URI, Long> e : cache.ttls.entrySet()) {
289: final long lm = e.getValue();
290:• if (lm + TtlCacheManager.this.timeToLive < currentTime) {
291: toEvict.add(e.getKey());
292: }
293: }
294: // Evict them
295: toEvict.forEach(TtlCacheManager.this::evict);
296: } finally {
297: TtlCacheManager.this.sweepRunning = false;
298: TtlCacheManager.this.releaseWriteLock();
299: }
300: }
301: }
302:
303: private static final class TtlCache extends EntityCache {
304:
305: private final Map<URI, Long> ttls = new HashMap<>();
306:
307: @Override
308: void put(Object identifier, Object entity, Descriptor descriptor) {
309: super.put(identifier, entity, descriptor);
310: final URI ctx = descriptor.getContext() != null ? descriptor.getContext() : defaultContext;
311: updateTimeToLive(ctx);
312: }
313:
314: @Override
315: <T> T get(Class<T> cls, Object identifier, Descriptor descriptor) {
316: assert cls != null;
317: assert identifier != null;
318:
319: final URI ctx = descriptor.getContext() != null ? descriptor.getContext() : defaultContext;
320: T result = super.get(cls, identifier, descriptor);
321: if (result != null) {
322: updateTimeToLive(ctx);
323: }
324: return result;
325: }
326:
327: private void updateTimeToLive(URI context) {
328: assert context != null;
329:
330: ttls.put(context, System.currentTimeMillis());
331: }
332:
333: @Override
334: void evict(URI context) {
335: final URI ctx = context != null ? context : defaultContext;
336: super.evict(ctx);
337: ttls.remove(context);
338: }
339:
340: @Override
341: void evict(Class<?> cls) {
342: for (Entry<URI, Map<Object, Map<Class<?>, Object>>> e : repoCache.entrySet()) {
343: final Map<Object, Map<Class<?>, Object>> m = e.getValue();
344: for (Entry<Object, Map<Class<?>, Object>> indNode : m.entrySet()) {
345: final Object instance = indNode.getValue().remove(cls);
346: if (instance != null) {
347: descriptors.remove(instance);
348: }
349: }
350: if (m.isEmpty()) {
351: ttls.remove(e.getKey());
352: }
353: }
354: }
355: }
356: }