Skip to content

Method: commit()

1: package cz.cvut.kbss.ontodriver.jena.connector;
2:
3: import cz.cvut.kbss.ontodriver.config.DriverConfiguration;
4: import cz.cvut.kbss.ontodriver.jena.config.JenaConfigParam;
5: import org.apache.jena.query.Dataset;
6: import org.apache.jena.query.Query;
7: import org.apache.jena.query.QueryExecution;
8: import org.apache.jena.query.ReadWrite;
9: import org.apache.jena.rdf.model.Model;
10: import org.apache.jena.rdf.model.ModelFactory;
11: import org.apache.jena.rdf.model.Statement;
12: import org.apache.jena.rdf.model.StmtIterator;
13: import org.apache.jena.rdfconnection.RDFConnection;
14: import org.apache.jena.rdfconnection.RDFConnectionFuseki;
15: import org.apache.jena.sparql.core.Transactional;
16:
17: import java.util.List;
18:
19: /**
20: * Represents a connection to a Jena Fuseki server.
21: */
22: class FusekiStorage implements Storage {
23:
24: private final boolean defaultAsUnion;
25: private final String serverUrl;
26:
27: private RDFConnection connection;
28:
29: FusekiStorage(DriverConfiguration configuration) {
30: this.defaultAsUnion = configuration.is(JenaConfigParam.TREAT_DEFAULT_GRAPH_AS_UNION);
31: this.serverUrl = configuration.getStorageProperties().getPhysicalURI().toString();
32: }
33:
34: private RDFConnection connect() {
35: if (connection == null) {
36: this.connection = RDFConnectionFuseki.create().destination(serverUrl).build();
37: }
38: return connection;
39: }
40:
41: @Override
42: public Transactional getTransactional() {
43: return connect();
44: }
45:
46: @Override
47: public Dataset getDataset() {
48: return connect().fetchDataset();
49: }
50:
51: @Override
52: public Model getDefaultGraph() {
53: if (defaultAsUnion) {
54: return ModelFactory.createUnion(connect().fetch(), getDataset().getUnionModel());
55: } else {
56: return connect().fetch();
57: }
58: }
59:
60: @Override
61: public Model getNamedGraph(String ctx) {
62: return connect().fetch(ctx);
63: }
64:
65: @Override
66: public void begin(ReadWrite readWrite) {
67: connect().begin(readWrite);
68: }
69:
70: @Override
71: public void commit() {
72: connection.commit();
73: closeConnection();
74: }
75:
76: @Override
77: public void rollback() {
78: connection.abort();
79: closeConnection();
80: }
81:
82: @Override
83: public void close() {
84: closeConnection();
85: }
86:
87: private void closeConnection() {
88: if (connection != null) {
89: connection.close();
90: this.connection = null;
91: }
92: }
93:
94: @Override
95: public void add(List<Statement> statements, String context) {
96: assert connection != null && connection.isInTransaction();
97: if (statements.isEmpty()) {
98: return;
99: }
100: final Model model = ModelFactory.createDefaultModel().add(statements);
101: if (context != null) {
102: connection.load(context, model);
103: } else {
104: connection.load(model);
105: }
106: }
107:
108: @Override
109: public void remove(List<Statement> statements, String context) {
110: assert connection != null && connection.isInTransaction();
111: if (statements.isEmpty()) {
112: return;
113: }
114: // Note that given the way Fuseki connection works, this can be quite inefficient (fetch model, update it, upload it again)
115: // So translation to a SPARQL update may be more appropriate
116: if (context != null) {
117: final Model m = connection.fetch(context);
118: m.remove(statements);
119: connection.put(context, m);
120: } else {
121: final Model def = connection.fetch();
122: def.remove(statements);
123: connection.put(def);
124: if (defaultAsUnion) {
125: connection.querySelect("SELECT ?g WHERE { ?g {} }", qs -> {
126: final String ctx = qs.getResource("g").getURI();
127: final Model m = connect().fetch(ctx);
128: m.remove(statements);
129: connection.put(ctx, m);
130: });
131: }
132: }
133: }
134:
135: @Override
136: public void remove(StmtIterator iterator, String context) {
137: assert connection != null && connection.isInTransaction();
138: final List<Statement> statements = iterator.toList();
139: remove(statements, context);
140: }
141:
142: @Override
143: public QueryExecution prepareQuery(Query query) {
144: return connect().query(query);
145: }
146:
147: @Override
148: public void executeUpdate(String update) {
149: connect().update(update);
150: }
151: }