Package: SnapshotStorageConnector
SnapshotStorageConnector
name | instruction | branch | complexity | line | method | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
SnapshotStorageConnector(AbstractStorageConnector) |
|
|
|
|
|
||||||||||||||||||||
add(List, String) |
|
|
|
|
|
||||||||||||||||||||
applyAdditions() |
|
|
|
|
|
||||||||||||||||||||
applyRemovals() |
|
|
|
|
|
||||||||||||||||||||
applyTransactionUpdateQueries() |
|
|
|
|
|
||||||||||||||||||||
begin() |
|
|
|
|
|
||||||||||||||||||||
cleanup() |
|
|
|
|
|
||||||||||||||||||||
commit() |
|
|
|
|
|
||||||||||||||||||||
contains(Resource, Property, RDFNode, Collection) |
|
|
|
|
|
||||||||||||||||||||
executeAskQuery(Query, Statement.StatementOntology) |
|
|
|
|
|
||||||||||||||||||||
executeSelectQuery(Query, Statement.StatementOntology) |
|
|
|
|
|
||||||||||||||||||||
executeUpdate(String, Statement.StatementOntology) |
|
|
|
|
|
||||||||||||||||||||
find(Resource, Property, RDFNode, Collection) |
|
|
|
|
|
||||||||||||||||||||
getContexts() |
|
|
|
|
|
||||||||||||||||||||
lambda$applyAdditions$1(Dataset, String) |
|
|
|
|
|
||||||||||||||||||||
lambda$applyRemovals$0(Dataset, String) |
|
|
|
|
|
||||||||||||||||||||
lambda$contains$3(Resource, Property, RDFNode, String) |
|
|
|
|
|
||||||||||||||||||||
lambda$find$2(Resource, Property, RDFNode, String) |
|
|
|
|
|
||||||||||||||||||||
remove(List, String) |
|
|
|
|
|
||||||||||||||||||||
remove(Resource, Property, RDFNode, String) |
|
|
|
|
|
||||||||||||||||||||
rollback() |
|
|
|
|
|
||||||||||||||||||||
snapshotCentralDataset() |
|
|
|
|
|
Coverage
1: /**
2: * Copyright (C) 2022 Czech Technical University in Prague
3: *
4: * This program is free software: you can redistribute it and/or modify it under
5: * the terms of the GNU General Public License as published by the Free Software
6: * Foundation, either version 3 of the License, or (at your option) any
7: * later version.
8: *
9: * This program is distributed in the hope that it will be useful, but WITHOUT
10: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11: * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12: * details. You should have received a copy of the GNU General Public License
13: * along with this program. If not, see <http://www.gnu.org/licenses/>.
14: */
15: package cz.cvut.kbss.ontodriver.jena.connector;
16:
17: import cz.cvut.kbss.ontodriver.Statement.StatementOntology;
18: import cz.cvut.kbss.ontodriver.jena.config.JenaConfigParam;
19: import cz.cvut.kbss.ontodriver.jena.exception.JenaDriverException;
20: import cz.cvut.kbss.ontodriver.jena.query.AbstractResultSet;
21: import org.apache.jena.query.Dataset;
22: import org.apache.jena.query.Query;
23: import org.apache.jena.rdf.model.Property;
24: import org.apache.jena.rdf.model.RDFNode;
25: import org.apache.jena.rdf.model.Resource;
26: import org.apache.jena.rdf.model.Statement;
27: import org.apache.jena.update.UpdateAction;
28:
29: import java.util.*;
30: import java.util.stream.Collectors;
31:
32: /**
33: * This connector implements the {@link cz.cvut.kbss.ontodriver.jena.config.JenaOntoDriverProperties#SNAPSHOT}-based
34: * transactional strategy.
35: * <p>
36: * It is also used when inference is required from the driver.
37: */
38: public class SnapshotStorageConnector extends SharedStorageConnector {
39:
40: final AbstractStorageConnector centralConnector;
41:
42: private LocalModel transactionalChanges;
43: private List<String> transactionalUpdates;
44:
45: SnapshotStorageConnector(AbstractStorageConnector centralConnector) {
46: super(centralConnector.configuration);
47: this.centralConnector = centralConnector;
48: }
49:
50: @Override
51: public void begin() {
52: ensureOpen();
53:• if (transaction.isActive()) {
54: throw new IllegalStateException("Transaction is already active.");
55: }
56: transaction.begin();
57: snapshotCentralDataset();
58: this.transactionalUpdates = new ArrayList<>();
59: this.transactionalChanges = new LocalModel(configuration.is(JenaConfigParam.TREAT_DEFAULT_GRAPH_AS_UNION));
60: }
61:
62: void snapshotCentralDataset() {
63: final SnapshotStorage s = new SnapshotStorage(configuration);
64: s.addCentralData(centralConnector.getStorage().getDataset());
65: this.storage = s;
66: }
67:
68: @Override
69: public void commit() throws JenaDriverException {
70: ensureTransactionalState();
71: transaction.commit();
72: try {
73: centralConnector.begin();
74: applyRemovals();
75: applyAdditions();
76: applyTransactionUpdateQueries();
77: centralConnector.commit();
78: } finally {
79: cleanup();
80: transaction.afterCommit();
81: }
82:
83: }
84:
85: private void applyRemovals() {
86: final Dataset removed = transactionalChanges.getRemoved();
87: centralConnector.remove(removed.getDefaultModel().listStatements().toList(), null);
88: removed.listNames()
89: .forEachRemaining(n -> centralConnector.remove(removed.getNamedModel(n).listStatements().toList(), n));
90: }
91:
92: private void applyAdditions() {
93: final Dataset added = transactionalChanges.getAdded();
94: centralConnector.add(added.getDefaultModel().listStatements().toList(), null);
95: added.listNames()
96: .forEachRemaining(n -> centralConnector.add(added.getNamedModel(n).listStatements().toList(), n));
97: }
98:
99: private void applyTransactionUpdateQueries() throws JenaDriverException {
100:• for (String query : transactionalUpdates) {
101: centralConnector.executeUpdate(query, StatementOntology.CENTRAL);
102: }
103: }
104:
105: @Override
106: public void rollback() {
107: ensureTransactionalState();
108: transaction.rollback();
109: cleanup();
110: transaction.afterRollback();
111: }
112:
113: private void cleanup() {
114: this.storage = null;
115: this.transactionalChanges = null;
116: this.transactionalUpdates = null;
117: }
118:
119: @Override
120: public List<Statement> find(Resource subject, Property property, RDFNode value, Collection<String> contexts) {
121: ensureTransactionalState();
122:• if (contexts.isEmpty()) {
123: return storage.getDefaultGraph().listStatements(subject, property, value).toList();
124: } else {
125: return contexts.stream()
126: .map(ctx -> storage.getNamedGraph(ctx).listStatements(subject, property, value).toList())
127: .flatMap(Collection::stream).collect(Collectors.toList());
128: }
129: }
130:
131: @Override
132: public boolean contains(Resource subject, Property property, RDFNode value, Collection<String> contexts) {
133: ensureTransactionalState();
134:• if (contexts.isEmpty()) {
135: return storage.getDefaultGraph().contains(subject, property, value);
136: } else {
137: return contexts.stream().anyMatch(c -> storage.getNamedGraph(c).contains(subject, property, value));
138: }
139: }
140:
141: @Override
142: public List<String> getContexts() {
143: ensureTransactionalState();
144: final Iterator<String> contexts = storage.getDataset().listNames();
145: final List<String> result = new ArrayList<>();
146: contexts.forEachRemaining(result::add);
147: return result;
148: }
149:
150: @Override
151: public void add(List<Statement> statements, String context) {
152: ensureTransactionalState();
153: storage.add(statements, context);
154: transactionalChanges.addStatements(statements, context);
155: }
156:
157: @Override
158: public void remove(List<Statement> statements, String context) {
159: ensureTransactionalState();
160: storage.remove(statements, context);
161: transactionalChanges.removeStatements(statements, context);
162: }
163:
164: @Override
165: public void remove(Resource subject, Property property, RDFNode object, String context) {
166: ensureTransactionalState();
167:• final List<Statement> toRemove = find(subject, property, object,
168: context != null ? Collections.singleton(context) : Collections.emptySet());
169: remove(toRemove, context);
170: }
171:
172: @Override
173: public AbstractResultSet executeSelectQuery(Query query, StatementOntology target) throws JenaDriverException {
174: ensureOpen();
175:• if (target == StatementOntology.TRANSACTIONAL) {
176: transaction.verifyActive();
177: return super.executeSelectQuery(query, target);
178: } else {
179: return centralConnector.executeSelectQuery(query, target);
180: }
181: }
182:
183: @Override
184: public AbstractResultSet executeAskQuery(Query query, StatementOntology target) throws JenaDriverException {
185: ensureOpen();
186:• if (target == StatementOntology.TRANSACTIONAL) {
187: transaction.verifyActive();
188: return super.executeAskQuery(query, target);
189: } else {
190: return centralConnector.executeAskQuery(query, target);
191: }
192: }
193:
194: @Override
195: public void executeUpdate(String query, StatementOntology target) throws JenaDriverException {
196: ensureOpen();
197:• if (target == StatementOntology.TRANSACTIONAL) {
198: transaction.verifyActive();
199: try {
200: UpdateAction.parseExecute(query, storage.getDataset());
201: transactionalUpdates.add(query);
202: } catch (RuntimeException e) {
203: throw new JenaDriverException("Execution of update " + query + " failed.", e);
204: }
205: } else {
206: centralConnector.executeUpdate(query, target);
207: }
208: }
209: }