Skip to contentMethod: commit()
1: /*
2: * JOPA
3: * Copyright (C) 2024 Czech Technical University in Prague
4: *
5: * This library is free software; you can redistribute it and/or
6: * modify it under the terms of the GNU Lesser General Public
7: * License as published by the Free Software Foundation; either
8: * version 3.0 of the License, or (at your option) any later version.
9: *
10: * This library is distributed in the hope that it will be useful,
11: * but WITHOUT ANY WARRANTY; without even the implied warranty of
12: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13: * Lesser General Public License for more details.
14: *
15: * You should have received a copy of the GNU Lesser General Public
16: * License along with this library.
17: */
18: package cz.cvut.kbss.ontodriver.jena.connector;
19:
20: import cz.cvut.kbss.ontodriver.config.DriverConfiguration;
21: import cz.cvut.kbss.ontodriver.jena.config.JenaConfigParam;
22: import org.apache.jena.query.Dataset;
23: import org.apache.jena.query.Query;
24: import org.apache.jena.query.QueryExecution;
25: import org.apache.jena.query.ReadWrite;
26: import org.apache.jena.rdf.model.Model;
27: import org.apache.jena.rdf.model.ModelFactory;
28: import org.apache.jena.rdf.model.Statement;
29: import org.apache.jena.rdf.model.StmtIterator;
30: import org.apache.jena.rdfconnection.RDFConnection;
31: import org.apache.jena.rdfconnection.RDFConnectionFuseki;
32: import org.apache.jena.sparql.core.Transactional;
33:
34: import java.util.List;
35:
36: /**
37: * Represents a connection to a Jena Fuseki server.
38: */
39: class FusekiStorage implements Storage {
40:
41: private final boolean defaultAsUnion;
42: private final String serverUrl;
43:
44: private RDFConnection connection;
45:
46: FusekiStorage(DriverConfiguration configuration) {
47: this.defaultAsUnion = configuration.is(JenaConfigParam.TREAT_DEFAULT_GRAPH_AS_UNION);
48: this.serverUrl = configuration.getStorageProperties().getPhysicalURI().toString();
49: }
50:
51: private RDFConnection connect() {
52: if (connection == null) {
53: this.connection = RDFConnectionFuseki.create().destination(serverUrl).build();
54: }
55: return connection;
56: }
57:
58: @Override
59: public Transactional getTransactional() {
60: return connect();
61: }
62:
63: @Override
64: public Dataset getDataset() {
65: return connect().fetchDataset();
66: }
67:
68: @Override
69: public Model getDefaultGraph() {
70: if (defaultAsUnion) {
71: return ModelFactory.createUnion(connect().fetch(), getDataset().getUnionModel());
72: } else {
73: return connect().fetch();
74: }
75: }
76:
77: @Override
78: public Model getNamedGraph(String ctx) {
79: return connect().fetch(ctx);
80: }
81:
82: @Override
83: public void begin(ReadWrite readWrite) {
84: connect().begin(readWrite);
85: }
86:
87: @Override
88: public void commit() {
89: connection.commit();
90: closeConnection();
91: }
92:
93: @Override
94: public void rollback() {
95: connection.abort();
96: closeConnection();
97: }
98:
99: @Override
100: public void close() {
101: closeConnection();
102: }
103:
104: private void closeConnection() {
105: if (connection != null) {
106: connection.close();
107: this.connection = null;
108: }
109: }
110:
111: @Override
112: public void add(List<Statement> statements, String context) {
113: assert connection != null && connection.isInTransaction();
114: if (statements.isEmpty()) {
115: return;
116: }
117: final Model model = ModelFactory.createDefaultModel().add(statements);
118: if (context != null) {
119: connection.load(context, model);
120: } else {
121: connection.load(model);
122: }
123: }
124:
125: @Override
126: public void remove(List<Statement> statements, String context) {
127: assert connection != null && connection.isInTransaction();
128: if (statements.isEmpty()) {
129: return;
130: }
131: // Note that given the way Fuseki connection works, this can be quite inefficient (fetch model, update it, upload it again)
132: // So translation to a SPARQL update may be more appropriate
133: if (context != null) {
134: final Model m = connection.fetch(context);
135: m.remove(statements);
136: connection.put(context, m);
137: } else {
138: final Model def = connection.fetch();
139: def.remove(statements);
140: connection.put(def);
141: if (defaultAsUnion) {
142: connection.querySelect("SELECT ?g WHERE { ?g {} }", qs -> {
143: final String ctx = qs.getResource("g").getURI();
144: final Model m = connect().fetch(ctx);
145: m.remove(statements);
146: connection.put(ctx, m);
147: });
148: }
149: }
150: }
151:
152: @Override
153: public void remove(StmtIterator iterator, String context) {
154: assert connection != null && connection.isInTransaction();
155: final List<Statement> statements = iterator.toList();
156: remove(statements, context);
157: }
158:
159: @Override
160: public QueryExecution prepareQuery(Query query) {
161: return connect().query(query);
162: }
163:
164: @Override
165: public void executeUpdate(String update) {
166: connect().update(update);
167: }
168: }