Package: TtlCacheManager$CacheSweeper
TtlCacheManager$CacheSweeper
name | instruction | branch | complexity | line | method | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
TtlCacheManager.CacheSweeper(TtlCacheManager) |
|
|
|
|
|
||||||||||||||||||||
run() |
|
|
|
|
|
Coverage
1: /**
2: * Copyright (C) 2016 Czech Technical University in Prague
3: * <p>
4: * This program is free software: you can redistribute it and/or modify it under
5: * the terms of the GNU General Public License as published by the Free Software
6: * Foundation, either version 3 of the License, or (at your option) any
7: * later version.
8: * <p>
9: * This program is distributed in the hope that it will be useful, but WITHOUT
10: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11: * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12: * details. You should have received a copy of the GNU General Public License
13: * along with this program. If not, see <http://www.gnu.org/licenses/>.
14: */
15: package cz.cvut.kbss.jopa.sessions.cache;
16:
17: import cz.cvut.kbss.jopa.model.JOPAPersistenceProperties;
18: import cz.cvut.kbss.jopa.sessions.CacheManager;
19: import cz.cvut.kbss.jopa.utils.ErrorUtils;
20: import org.slf4j.Logger;
21: import org.slf4j.LoggerFactory;
22:
23: import java.net.URI;
24: import java.util.*;
25: import java.util.Map.Entry;
26: import java.util.concurrent.Executors;
27: import java.util.concurrent.Future;
28: import java.util.concurrent.ScheduledExecutorService;
29: import java.util.concurrent.TimeUnit;
30: import java.util.concurrent.locks.Lock;
31: import java.util.concurrent.locks.ReentrantReadWriteLock;
32:
33: /**
34: * Manages the second level cache shared by all persistence contexts.
35: * <p>
36: * This implementation of CacheManager uses cache-wide locking, i. e. the whole cache is locked when an entity is being
37: * put in it, no matter that only one context is affected by the change.
38: * <p>
39: * This cache is swept regularly by a dedicated thread, which removes all entries whose time-to-live (TTL) has
40: * expired.
41: */
42: public class TtlCacheManager implements CacheManager {
43:
44: private static final Logger LOG = LoggerFactory.getLogger(TtlCacheManager.class);
45:
46: /**
47: * Default time to live in millis
48: */
49: private static final long DEFAULT_TTL = 60000L;
50: // Initial delay is the sweep rate multiplied by this multiplier
51: private static final int DELAY_MULTIPLIER = 2;
52: /**
53: * Default sweep rate in millis
54: */
55: private static final long DEFAULT_SWEEP_RATE = 30000L;
56:
57: private Set<Class<?>> inferredClasses;
58:
59: private TtlCache cache;
60:
61: // Each repository can have its own lock and they could be acquired by this
62: // instance itself, no need to pass this burden to callers
63: private final Lock readLock;
64: private final Lock writeLock;
65:
66: private final ScheduledExecutorService sweeperScheduler;
67: private Future<?> sweeperFuture;
68: private long initDelay;
69: private long sweepRate;
70: private long timeToLive;
71: private volatile boolean sweepRunning;
72:
73: public TtlCacheManager(Map<String, String> properties) {
74: this.cache = new TtlCache();
75: initSettings(properties);
76: final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
77: this.readLock = lock.readLock();
78: this.writeLock = lock.writeLock();
79: this.sweeperScheduler = Executors.newSingleThreadScheduledExecutor();
80: this.sweeperFuture = sweeperScheduler.scheduleAtFixedRate(new CacheSweeper(), initDelay, sweepRate,
81: TimeUnit.MILLISECONDS);
82: }
83:
84: private void initSettings(Map<String, String> properties) {
85: if (!properties.containsKey(JOPAPersistenceProperties.CACHE_TTL)) {
86: this.timeToLive = DEFAULT_TTL;
87: } else {
88: final String strCacheTtl = properties.get(JOPAPersistenceProperties.CACHE_TTL);
89: try {
90: // The property is in seconds, we need milliseconds
91: this.timeToLive = Long.parseLong(strCacheTtl) * 1000;
92: } catch (NumberFormatException e) {
93: LOG.warn("Unable to parse cache time to live setting value {}, using default value.", strCacheTtl);
94: this.timeToLive = DEFAULT_TTL;
95: }
96: }
97: if (!properties.containsKey(JOPAPersistenceProperties.CACHE_SWEEP_RATE)) {
98: this.sweepRate = DEFAULT_SWEEP_RATE;
99: } else {
100: final String strSweepRate = properties
101: .get(JOPAPersistenceProperties.CACHE_SWEEP_RATE);
102: try {
103: // The property is in seconds, we need milliseconds
104: this.sweepRate = Long.parseLong(strSweepRate) * 1000;
105: } catch (NumberFormatException e) {
106: LOG.warn("Unable to parse sweep rate setting value {}, using default value.", strSweepRate);
107: this.sweepRate = DEFAULT_SWEEP_RATE;
108: }
109: }
110: this.initDelay = DELAY_MULTIPLIER * sweepRate;
111: }
112:
113: @Override
114: public void close() {
115: if (sweeperFuture != null) {
116: LOG.debug("Stopping cache sweeper.");
117: sweeperFuture.cancel(true);
118: sweeperScheduler.shutdown();
119: }
120: this.sweeperFuture = null;
121: }
122:
123: @Override
124: public void add(Object primaryKey, Object entity, URI context) {
125: Objects.requireNonNull(primaryKey, ErrorUtils.constructNPXMessage("primaryKey"));
126: Objects.requireNonNull(entity, ErrorUtils.constructNPXMessage("entity"));
127:
128: acquireWriteLock();
129: try {
130: cache.put(primaryKey, entity, context);
131: } finally {
132: releaseWriteLock();
133: }
134: }
135:
136: /**
137: * Releases the live object cache.
138: */
139: private void releaseCache() {
140: acquireWriteLock();
141: try {
142: this.cache = new TtlCache();
143: } finally {
144: releaseWriteLock();
145: }
146: }
147:
148: @Override
149: public void clearInferredObjects() {
150: acquireWriteLock();
151: try {
152: getInferredClasses().forEach(cache::evict);
153: } finally {
154: releaseWriteLock();
155: }
156: }
157:
158: @Override
159: public <T> T get(Class<T> cls, Object primaryKey, URI context) {
160: if (cls == null || primaryKey == null) {
161: return null;
162: }
163: acquireReadLock();
164: try {
165: return cache.get(cls, primaryKey, context);
166: } finally {
167: releaseReadLock();
168: }
169: }
170:
171: /**
172: * Get the set of inferred classes.
173: * <p>
174: * Inferred classes (i. e. classes with inferred attributes) are tracked separately since they require special
175: * behavior.
176: *
177: * @return Set of inferred classes
178: */
179: public Set<Class<?>> getInferredClasses() {
180: if (inferredClasses == null) {
181: this.inferredClasses = new HashSet<>();
182: }
183: return inferredClasses;
184: }
185:
186: /**
187: * Set the inferred classes.
188: * <p>
189: * For more information about inferred classes see {@link #getInferredClasses()}.
190: *
191: * @param inferredClasses The set of inferred classes
192: */
193: @Override
194: public void setInferredClasses(Set<Class<?>> inferredClasses) {
195: this.inferredClasses = inferredClasses;
196: }
197:
198: @Override
199: public boolean contains(Class<?> cls, Object primaryKey) {
200: if (cls == null || primaryKey == null) {
201: return false;
202: }
203: acquireReadLock();
204: try {
205: return cache.contains(cls, primaryKey);
206: } finally {
207: releaseReadLock();
208: }
209: }
210:
211: @Override
212: public boolean contains(Class<?> cls, Object primaryKey, URI context) {
213: if (cls == null || primaryKey == null) {
214: return false;
215: }
216: acquireReadLock();
217: try {
218: return cache.contains(cls, primaryKey, context);
219: } finally {
220: releaseReadLock();
221: }
222: }
223:
224: @Override
225: public void evict(Class<?> cls) {
226: Objects.requireNonNull(cls, ErrorUtils.constructNPXMessage("cls"));
227:
228: acquireWriteLock();
229: try {
230: cache.evict(cls);
231: } finally {
232: releaseWriteLock();
233: }
234: }
235:
236: @Override
237: public void evict(Class<?> cls, Object primaryKey, URI context) {
238: Objects.requireNonNull(cls, ErrorUtils.constructNPXMessage("cls"));
239: Objects.requireNonNull(primaryKey, ErrorUtils.constructNPXMessage("primaryKey"));
240:
241: acquireWriteLock();
242: try {
243: cache.evict(cls, primaryKey, context);
244: } finally {
245: releaseWriteLock();
246: }
247:
248: }
249:
250: @Override
251: public void evict(URI context) {
252: acquireWriteLock();
253: try {
254: cache.evict(context);
255: } finally {
256: releaseWriteLock();
257: }
258: }
259:
260: @Override
261: public void evictAll() {
262: releaseCache();
263: }
264:
265: private void acquireReadLock() {
266: readLock.lock();
267: }
268:
269: private void releaseReadLock() {
270: readLock.unlock();
271: }
272:
273: private void acquireWriteLock() {
274: writeLock.lock();
275: }
276:
277: private void releaseWriteLock() {
278: writeLock.unlock();
279: }
280:
281: /**
282: * Sweeps the second level cache and removes entities with no more time to live.
283: */
284: private final class CacheSweeper implements Runnable {
285:
286: public void run() {
287: LOG.trace("Running cache sweep.");
288: TtlCacheManager.this.acquireWriteLock();
289: try {
290:• if (TtlCacheManager.this.sweepRunning) {
291: return;
292: }
293: TtlCacheManager.this.sweepRunning = true;
294: final long currentTime = System.currentTimeMillis();
295: final long timeToLive = TtlCacheManager.this.timeToLive;
296: final List<URI> toEvict = new ArrayList<>();
297: // Mark the objects for eviction (can't evict them now, it would
298: // cause ConcurrentModificationException)
299:• for (Entry<URI, Long> e : cache.ttls.entrySet()) {
300: final long lm = e.getValue();
301:• if (lm + timeToLive < currentTime) {
302: toEvict.add(e.getKey());
303: }
304: }
305: // Evict them
306: toEvict.forEach(TtlCacheManager.this::evict);
307: } finally {
308: TtlCacheManager.this.sweepRunning = false;
309: TtlCacheManager.this.releaseWriteLock();
310: }
311: }
312: }
313:
314: private static final class TtlCache extends EntityCache {
315:
316: private final Map<URI, Long> ttls = new HashMap<>();
317:
318: void put(Object identifier, Object entity, URI context) {
319: super.put(identifier, entity, context);
320: final URI ctx = context != null ? context : defaultContext;
321: updateTimeToLive(ctx);
322: }
323:
324: <T> T get(Class<T> cls, Object identifier, URI context) {
325: assert cls != null;
326: assert identifier != null;
327:
328: final URI ctx = context != null ? context : defaultContext;
329: T result = super.get(cls, identifier, ctx);
330: if (result != null) {
331: updateTimeToLive(ctx);
332: }
333: return result;
334: }
335:
336: private void updateTimeToLive(URI context) {
337: assert context != null;
338:
339: ttls.put(context, System.currentTimeMillis());
340: }
341:
342: void evict(URI context) {
343: final URI ctx = context != null ? context : defaultContext;
344: super.evict(ctx);
345: ttls.remove(context);
346: }
347:
348: void evict(Class<?> cls) {
349: for (Entry<URI, Map<Object, Map<Class<?>, Object>>> e : repoCache.entrySet()) {
350: final Map<Object, Map<Class<?>, Object>> m = e.getValue();
351: for (Entry<Object, Map<Class<?>, Object>> indNode : m.entrySet()) {
352: indNode.getValue().remove(cls);
353: }
354: if (m.isEmpty()) {
355: ttls.remove(e.getKey());
356: }
357: }
358: }
359: }
360: }