Skip to contentMethod: acquire(int)
1: /*
2: * JOPA
3: * Copyright (C) 2023 Czech Technical University in Prague
4: *
5: * This library is free software; you can redistribute it and/or
6: * modify it under the terms of the GNU Lesser General Public
7: * License as published by the Free Software Foundation; either
8: * version 3.0 of the License, or (at your option) any later version.
9: *
10: * This library is distributed in the hope that it will be useful,
11: * but WITHOUT ANY WARRANTY; without even the implied warranty of
12: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13: * Lesser General Public License for more details.
14: *
15: * You should have received a copy of the GNU Lesser General Public
16: * License along with this library.
17: */
18: package cz.cvut.kbss.ontodriver.rdf4j.connector;
19:
20: import cz.cvut.kbss.ontodriver.Closeable;
21: import cz.cvut.kbss.ontodriver.Wrapper;
22: import cz.cvut.kbss.ontodriver.config.DriverConfiguration;
23: import cz.cvut.kbss.ontodriver.exception.OntoDriverException;
24: import cz.cvut.kbss.ontodriver.rdf4j.config.Constants;
25: import cz.cvut.kbss.ontodriver.rdf4j.config.Rdf4jConfigParam;
26: import cz.cvut.kbss.ontodriver.rdf4j.config.Rdf4jOntoDriverProperties;
27: import cz.cvut.kbss.ontodriver.rdf4j.connector.init.RemoteRepositoryWrapper;
28: import cz.cvut.kbss.ontodriver.rdf4j.exception.Rdf4jDriverException;
29: import cz.cvut.kbss.ontodriver.rdf4j.exception.RepositoryCreationException;
30: import cz.cvut.kbss.ontodriver.rdf4j.exception.RepositoryNotFoundException;
31: import org.eclipse.rdf4j.model.Model;
32: import org.eclipse.rdf4j.model.Resource;
33: import org.eclipse.rdf4j.model.ValueFactory;
34: import org.eclipse.rdf4j.model.vocabulary.CONFIG;
35: import org.eclipse.rdf4j.model.vocabulary.RDF;
36: import org.eclipse.rdf4j.repository.Repository;
37: import org.eclipse.rdf4j.repository.RepositoryConnection;
38: import org.eclipse.rdf4j.repository.RepositoryException;
39: import org.eclipse.rdf4j.repository.config.RepositoryConfig;
40: import org.eclipse.rdf4j.repository.config.RepositoryConfigException;
41: import org.eclipse.rdf4j.repository.config.RepositoryConfigSchema;
42: import org.eclipse.rdf4j.repository.http.HTTPRepository;
43: import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
44: import org.eclipse.rdf4j.repository.manager.RepositoryManager;
45: import org.eclipse.rdf4j.repository.manager.RepositoryProvider;
46: import org.eclipse.rdf4j.repository.sail.SailRepository;
47: import org.eclipse.rdf4j.repository.sail.config.SailRepositoryConfig;
48: import org.eclipse.rdf4j.rio.RDFFormat;
49: import org.eclipse.rdf4j.rio.Rio;
50: import org.eclipse.rdf4j.sail.Sail;
51: import org.eclipse.rdf4j.sail.config.SailImplConfig;
52: import org.eclipse.rdf4j.sail.helpers.SailWrapper;
53: import org.eclipse.rdf4j.sail.inferencer.fc.SchemaCachingRDFSInferencer;
54: import org.eclipse.rdf4j.sail.inferencer.fc.config.SchemaCachingRDFSInferencerConfig;
55: import org.eclipse.rdf4j.sail.memory.MemoryStore;
56: import org.eclipse.rdf4j.sail.nativerdf.config.NativeStoreConfig;
57: import org.slf4j.Logger;
58: import org.slf4j.LoggerFactory;
59:
60: import java.io.FileInputStream;
61: import java.io.FileNotFoundException;
62: import java.io.IOException;
63: import java.io.InputStream;
64: import java.net.URI;
65: import java.util.Objects;
66: import java.util.Optional;
67: import java.util.Set;
68:
69: public class StorageConnector implements Closeable, Wrapper {
70:
71: private static final Logger LOG = LoggerFactory.getLogger(StorageConnector.class);
72:
73: private static final String[] KNOWN_REMOTE_SCHEMES = {"http", "https", "ftp"};
74: private static final String LOCAL_NATIVE_REPO = "repositories/";
75: private static final String FILE_SCHEME = "file";
76: private static final String CLASSPATH_PREFIX = "classpath:";
77:
78: private final DriverConfiguration configuration;
79: private final int maxReconnectAttempts;
80:
81: private RepositoryManager manager;
82: private Repository repository;
83:
84: private boolean open;
85:
86: public StorageConnector(DriverConfiguration configuration) throws Rdf4jDriverException {
87: this.configuration = configuration;
88: this.maxReconnectAttempts = resolveMaxReconnectAttempts();
89: }
90:
91: private int resolveMaxReconnectAttempts() throws Rdf4jDriverException {
92: try {
93: final int attempts = configuration.isSet(Rdf4jConfigParam.RECONNECT_ATTEMPTS) ? Integer.parseInt(
94: configuration.getProperty(Rdf4jConfigParam.RECONNECT_ATTEMPTS)) :
95: Constants.DEFAULT_RECONNECT_ATTEMPTS_COUNT;
96: if (attempts < 0) {
97: throw invalidReconnectAttemptsConfig();
98: }
99: return attempts;
100: } catch (NumberFormatException e) {
101: throw invalidReconnectAttemptsConfig();
102: }
103: }
104:
105: private static Rdf4jDriverException invalidReconnectAttemptsConfig() {
106: return new Rdf4jDriverException(
107: "Invalid value of configuration parameter " + Rdf4jOntoDriverProperties.RECONNECT_ATTEMPTS +
108: ". Must be a non-negative integer.");
109: }
110:
111: public void initializeRepository() throws Rdf4jDriverException {
112: final URI serverUri = configuration.getStorageProperties().getPhysicalURI();
113: LOG.debug("Initializing connector to repository at {}", serverUri);
114: try {
115: final boolean isRemote = isRemoteRepository(serverUri);
116: if (isRemote) {
117: this.repository = connectToRemoteRepository(serverUri.toString());
118: } else {
119: this.repository = createLocalRepository();
120: }
121: verifyRepositoryCreated(serverUri, isRemote);
122: repository.init();
123: } catch (RepositoryException | RepositoryConfigException e) {
124: throw new Rdf4jDriverException("Failed to acquire RDF4J repository connection.", e);
125: }
126: this.open = true;
127: }
128:
129: private static boolean isRemoteRepository(URI uri) {
130: final String scheme = uri.getScheme();
131: for (String s : KNOWN_REMOTE_SCHEMES) {
132: if (s.equals(scheme)) {
133: return true;
134: }
135: }
136: return false;
137: }
138:
139: private Repository connectToRemoteRepository(String repoUri) {
140: this.manager = RepositoryProvider.getRepositoryManagerOfRepository(repoUri);
141: final RemoteRepositoryManager remoteManager = (RemoteRepositoryManager) manager;
142: final String username = configuration.getStorageProperties().getUsername();
143: if (username != null) {
144: final String password = configuration.getStorageProperties().getPassword();
145: remoteManager.setUsernameAndPassword(username, password);
146: }
147: return connectToRemote(repoUri, 1);
148: }
149:
150: private Repository connectToRemote(String repoUri, int attempts) {
151: try {
152: return new RemoteRepositoryWrapper((HTTPRepository) manager.getRepository(RepositoryProvider.getRepositoryIdOfRepository(repoUri)), configuration);
153: } catch (RepositoryException e) {
154: if (attempts < maxReconnectAttempts) {
155: LOG.warn("Unable to connect to repository {}. Error is: {}. Retrying...", repoUri, e.getMessage());
156: return connectToRemote(repoUri, attempts + 1);
157: }
158: LOG.error("Threshold of failed connection attempts reached, throwing exception.");
159: throw e;
160: }
161: }
162:
163: private Repository createLocalRepository() {
164: if (configuration.isSet(Rdf4jConfigParam.REPOSITORY_CONFIG)) {
165: return createRepositoryFromConfig();
166: }
167: final URI localUri = configuration.getStorageProperties().getPhysicalURI();
168: if (!isFileUri(localUri) && configuration.is(Rdf4jConfigParam.USE_VOLATILE_STORAGE)) {
169: return createInMemoryRepository();
170: } else {
171: return createNativeRepository(configuration, localUri.toString());
172: }
173: }
174:
175: private Repository createRepositoryFromConfig() {
176: LOG.trace("Creating local repository from repository config file.");
177: final RepositoryConfig repoConfig = loadRepositoryConfig();
178: this.manager = RepositoryProvider.getRepositoryManager(getRepositoryManagerBaseDir().orElse(""));
179: manager.addRepositoryConfig(repoConfig);
180: return manager.getRepository(getRepositoryId());
181: }
182:
183: @SuppressWarnings("deprecated")
184: private RepositoryConfig loadRepositoryConfig() {
185: try (final InputStream is = getConfigFileContent()) {
186: final Model configModel = Rio.parse(is, "", RDFFormat.TURTLE);
187: Set<Resource> resources =
188: configModel.filter(null, RDF.TYPE, CONFIG.Rep.Repository).subjects();
189: if (resources.isEmpty()) {
190: // Support for legacy repository configuration vocabulary.
191: // https://rdf4j.org/documentation/reference/configuration/#migrating-old-configurations
192: resources = configModel.filter(null, RDF.TYPE, RepositoryConfigSchema.REPOSITORY).subjects();
193: }
194: assert resources.size() == 1;
195: return RepositoryConfig.create(configModel, resources.iterator().next());
196: } catch (IOException e) {
197: throw new RepositoryCreationException("Unable to create repository from the specified configuration.", e);
198: }
199: }
200:
201: private InputStream getConfigFileContent() {
202: final String configPath = configuration.getProperty(Rdf4jConfigParam.REPOSITORY_CONFIG);
203: LOG.trace("Loading repository configuration file content from {}.", configPath);
204: if (configPath.startsWith(CLASSPATH_PREFIX)) {
205: final InputStream is =
206: getClass().getClassLoader().getResourceAsStream(configPath.substring(CLASSPATH_PREFIX.length()));
207: if (is == null) {
208: throw new RepositoryCreationException(
209: "Unable to find repository configuration file on classpath location " + configPath);
210: }
211: return is;
212: } else {
213: try {
214: return new FileInputStream(configPath);
215: } catch (FileNotFoundException e) {
216: throw new RepositoryCreationException("Unable to find repository configuration file at " + configPath,
217: e);
218: }
219: }
220: }
221:
222: private Optional<String> getRepositoryManagerBaseDir() {
223: final String physicalUri = configuration.getStorageProperties().getPhysicalURI().toString();
224: final String[] tmp = physicalUri.split(LOCAL_NATIVE_REPO);
225: return tmp.length == 2 ? Optional.of(tmp[0]) : Optional.empty();
226: }
227:
228: private String getRepositoryId() {
229: final String physicalUri = configuration.getStorageProperties().getPhysicalURI().toString();
230: final String[] tmp = physicalUri.split(LOCAL_NATIVE_REPO);
231: if (tmp.length != 2) {
232: return physicalUri;
233: }
234: String repoId = tmp[1];
235: // Get rid of the trailing slash if necessary
236: return repoId.charAt(repoId.length() - 1) == '/' ? repoId.substring(0, repoId.length() - 1) : repoId;
237: }
238:
239: private static boolean isFileUri(URI uri) {
240: return uri.getScheme() != null && uri.getScheme().equals(FILE_SCHEME);
241: }
242:
243: /**
244: * Creates a local in-memory RDF4J repository which is disposed of when the VM shuts down.
245: */
246: private Repository createInMemoryRepository() {
247: LOG.trace("Creating local in-memory repository.");
248: final MemoryStore ms = new MemoryStore();
249: if (configuration.is(Rdf4jConfigParam.USE_INFERENCE)) {
250: return new SailRepository(new SchemaCachingRDFSInferencer(ms));
251: } else {
252: return new SailRepository(ms);
253: }
254: }
255:
256: /**
257: * Creates native repository.
258: * <p>
259: * This kind of repository stores data in files and is persistent after the VM shuts down.
260: */
261: private Repository createNativeRepository(DriverConfiguration configuration, String localUri) {
262: LOG.trace("Creating local native repository at " + localUri);
263: validateNativeStorePath(localUri);
264: try {
265: this.manager = RepositoryProvider.getRepositoryManagerOfRepository(localUri);
266: final String repoId = getRepositoryId();
267: final RepositoryConfig cfg = createLocalNativeRepositoryConfig(repoId, configuration);
268: manager.addRepositoryConfig(cfg);
269: return manager.getRepository(repoId);
270: } catch (RepositoryConfigException | RepositoryException e) {
271: throw new RepositoryCreationException("Unable to create local repository at " + localUri, e);
272: }
273: }
274:
275: private static void validateNativeStorePath(String path) {
276: if (path.split(LOCAL_NATIVE_REPO).length != 2) {
277: throw new RepositoryCreationException(
278: "Unsupported local RDF4J repository path. Expected file://path/repositories/id but got " +
279: path);
280: }
281: }
282:
283: private static RepositoryConfig createLocalNativeRepositoryConfig(String repoId,
284: DriverConfiguration configuration) {
285: SailImplConfig backend = new NativeStoreConfig();
286: if (configuration.is(Rdf4jConfigParam.USE_INFERENCE)) {
287: backend = new SchemaCachingRDFSInferencerConfig(backend);
288: }
289: final SailRepositoryConfig repoType = new SailRepositoryConfig(backend);
290: return new RepositoryConfig(repoId, repoType);
291: }
292:
293: private void verifyRepositoryCreated(URI serverUri, boolean isRemote) {
294: if (repository == null) {
295: if (isRemote) {
296: throw new RepositoryNotFoundException("Unable to reach repository at " + serverUri);
297: } else {
298: throw new RepositoryCreationException("Unable to create local repository at " + serverUri);
299: }
300: }
301: }
302:
303: /**
304: * Replaces the currently open repository with the specified one.
305: * <p>
306: * Note that this functionality is only supported for in-memory stores.
307: *
308: * @param newRepository The new repository to set
309: */
310: public void setRepository(Repository newRepository) {
311: Objects.requireNonNull(newRepository);
312: verifyOpen();
313: if (!isInMemoryRepository(repository)) {
314: throw new UnsupportedOperationException("Cannot replace repository which is not in-memory.");
315: }
316: repository.shutDown();
317: assert newRepository.isInitialized();
318: this.repository = newRepository;
319: // Since in-memory repositories are not registered in RepositoryManager, we shouldn't need to deal with it
320: }
321:
322: private static boolean isInMemoryRepository(Repository repo) {
323: if (!(repo instanceof SailRepository)) {
324: return false;
325: }
326: Sail sail = ((SailRepository) repo).getSail();
327: while (sail instanceof SailWrapper) {
328: sail = ((SailWrapper) sail).getBaseSail();
329: }
330: return sail instanceof MemoryStore;
331: }
332:
333: public ValueFactory getValueFactory() {
334: verifyOpen();
335: return repository.getValueFactory();
336: }
337:
338: public RepositoryConnection acquireConnection() throws Rdf4jDriverException {
339: verifyOpen();
340: // Workaround for local native storage being reset when multiple drivers access it
341: if (!repository.isInitialized()) {
342: repository.init();
343: }
344: LOG.trace("Acquiring repository connection.");
345: return acquire(1);
346: }
347:
348: private RepositoryConnection acquire(int attempts) throws Rdf4jDriverException {
349: try {
350: return repository.getConnection();
351: } catch (RepositoryException e) {
352:• if (attempts < maxReconnectAttempts) {
353: LOG.warn("Unable to acquire repository connection. Error is: {}. Retrying...", e.getMessage());
354: return acquire(attempts + 1);
355: }
356: LOG.error("Threshold of failed connection acquisition attempts reached, throwing exception.");
357: throw new Rdf4jDriverException(e);
358: }
359: }
360:
361: @Override
362: public void close() throws OntoDriverException {
363: if (!open) {
364: return;
365: }
366: try {
367: repository.shutDown();
368: if (manager != null) {
369: manager.shutDown();
370: }
371: } catch (RuntimeException e) {
372: throw new Rdf4jDriverException("Exception caught when closing repository connector.", e);
373: } finally {
374: this.open = false;
375: }
376: }
377:
378: @Override
379: public boolean isOpen() {
380: return open;
381: }
382:
383: private void verifyOpen() {
384: if (!open) {
385: throw new IllegalStateException("Connector is not open.");
386: }
387: }
388:
389: @Override
390: public <T> T unwrap(Class<T> cls) throws OntoDriverException {
391: verifyOpen();
392: if (cls.isAssignableFrom(getClass())) {
393: return cls.cast(this);
394: }
395: if (cls.isAssignableFrom(repository.getClass())) {
396: return cls.cast(repository);
397: }
398: if (repository instanceof Wrapper) {
399: return ((Wrapper) repository).unwrap(cls);
400: }
401: throw new Rdf4jDriverException("No class of type " + cls + " found.");
402: }
403: }