Skip to contentMethod: commit()
1: /**
2: * Copyright (C) 2022 Czech Technical University in Prague
3: *
4: * This program is free software: you can redistribute it and/or modify it under
5: * the terms of the GNU General Public License as published by the Free Software
6: * Foundation, either version 3 of the License, or (at your option) any
7: * later version.
8: *
9: * This program is distributed in the hope that it will be useful, but WITHOUT
10: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11: * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12: * details. You should have received a copy of the GNU General Public License
13: * along with this program. If not, see <http://www.gnu.org/licenses/>.
14: */
15: package cz.cvut.kbss.ontodriver.jena.connector;
16:
17: import cz.cvut.kbss.ontodriver.config.DriverConfiguration;
18: import cz.cvut.kbss.ontodriver.jena.config.JenaConfigParam;
19: import org.apache.jena.query.Dataset;
20: import org.apache.jena.query.Query;
21: import org.apache.jena.query.QueryExecution;
22: import org.apache.jena.query.ReadWrite;
23: import org.apache.jena.rdf.model.Model;
24: import org.apache.jena.rdf.model.ModelFactory;
25: import org.apache.jena.rdf.model.Statement;
26: import org.apache.jena.rdf.model.StmtIterator;
27: import org.apache.jena.rdfconnection.RDFConnection;
28: import org.apache.jena.rdfconnection.RDFConnectionFuseki;
29: import org.apache.jena.sparql.core.Transactional;
30:
31: import java.util.List;
32:
33: /**
34: * Represents a connection to a Jena Fuseki server.
35: */
36: class FusekiStorage implements Storage {
37:
38: private final boolean defaultAsUnion;
39: private final String serverUrl;
40:
41: private RDFConnection connection;
42:
43: FusekiStorage(DriverConfiguration configuration) {
44: this.defaultAsUnion = configuration.is(JenaConfigParam.TREAT_DEFAULT_GRAPH_AS_UNION);
45: this.serverUrl = configuration.getStorageProperties().getPhysicalURI().toString();
46: }
47:
48: private RDFConnection connect() {
49: if (connection == null) {
50: this.connection = RDFConnectionFuseki.create().destination(serverUrl).build();
51: }
52: return connection;
53: }
54:
55: @Override
56: public Transactional getTransactional() {
57: return connect();
58: }
59:
60: @Override
61: public Dataset getDataset() {
62: return connect().fetchDataset();
63: }
64:
65: @Override
66: public Model getDefaultGraph() {
67: if (defaultAsUnion) {
68: return ModelFactory.createUnion(connect().fetch(), getDataset().getUnionModel());
69: } else {
70: return connect().fetch();
71: }
72: }
73:
74: @Override
75: public Model getNamedGraph(String ctx) {
76: return connect().fetch(ctx);
77: }
78:
79: @Override
80: public void begin(ReadWrite readWrite) {
81: connect().begin(readWrite);
82: }
83:
84: @Override
85: public void commit() {
86: connection.commit();
87: closeConnection();
88: }
89:
90: @Override
91: public void rollback() {
92: connection.abort();
93: closeConnection();
94: }
95:
96: @Override
97: public void close() {
98: closeConnection();
99: }
100:
101: private void closeConnection() {
102: if (connection != null) {
103: connection.close();
104: this.connection = null;
105: }
106: }
107:
108: @Override
109: public void add(List<Statement> statements, String context) {
110: assert connection != null && connection.isInTransaction();
111: if (statements.isEmpty()) {
112: return;
113: }
114: final Model model = ModelFactory.createDefaultModel().add(statements);
115: if (context != null) {
116: connection.load(context, model);
117: } else {
118: connection.load(model);
119: }
120: }
121:
122: @Override
123: public void remove(List<Statement> statements, String context) {
124: assert connection != null && connection.isInTransaction();
125: if (statements.isEmpty()) {
126: return;
127: }
128: // Note that given the way Fuseki connection works, this can be quite inefficient (fetch model, update it, upload it again)
129: // So translation to a SPARQL update may be more appropriate
130: if (context != null) {
131: final Model m = connection.fetch(context);
132: m.remove(statements);
133: connection.put(context, m);
134: } else {
135: final Model def = connection.fetch();
136: def.remove(statements);
137: connection.put(def);
138: if (defaultAsUnion) {
139: connection.querySelect("SELECT ?g WHERE { ?g {} }", qs -> {
140: final String ctx = qs.getResource("g").getURI();
141: final Model m = connect().fetch(ctx);
142: m.remove(statements);
143: connection.put(ctx, m);
144: });
145: }
146: }
147: }
148:
149: @Override
150: public void remove(StmtIterator iterator, String context) {
151: assert connection != null && connection.isInTransaction();
152: final List<Statement> statements = iterator.toList();
153: remove(statements, context);
154: }
155:
156: @Override
157: public QueryExecution prepareQuery(Query query) {
158: return connect().query(query);
159: }
160:
161: @Override
162: public void executeUpdate(String update) {
163: connect().update(update);
164: }
165: }