Package: RepositoryConnectorInitializer
RepositoryConnectorInitializer
name | instruction | branch | complexity | line | method | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
RepositoryConnectorInitializer(DriverConfiguration, int) |
|
|
|
|
|
||||||||||||||||||||
connectToRemote(String, int) |
|
|
|
|
|
||||||||||||||||||||
connectToRemoteRepository(String) |
|
|
|
|
|
||||||||||||||||||||
createInMemoryRepository() |
|
|
|
|
|
||||||||||||||||||||
createLocalNativeRepositoryConfig(String, DriverConfiguration) |
|
|
|
|
|
||||||||||||||||||||
createLocalRepository() |
|
|
|
|
|
||||||||||||||||||||
createNativeRepository(DriverConfiguration, String) |
|
|
|
|
|
||||||||||||||||||||
createRepositoryFromConfig() |
|
|
|
|
|
||||||||||||||||||||
getConfigFileContent() |
|
|
|
|
|
||||||||||||||||||||
getManager() |
|
|
|
|
|
||||||||||||||||||||
getRepository() |
|
|
|
|
|
||||||||||||||||||||
getRepositoryId() |
|
|
|
|
|
||||||||||||||||||||
getRepositoryManagerBaseDir() |
|
|
|
|
|
||||||||||||||||||||
initializeRepository() |
|
|
|
|
|
||||||||||||||||||||
isFileUri(URI) |
|
|
|
|
|
||||||||||||||||||||
isRemoteRepository(URI) |
|
|
|
|
|
||||||||||||||||||||
loadRepositoryConfig() |
|
|
|
|
|
||||||||||||||||||||
static {...} |
|
|
|
|
|
||||||||||||||||||||
validateNativeStorePath(String) |
|
|
|
|
|
||||||||||||||||||||
verifyRepositoryCreated(URI, boolean) |
|
|
|
|
|
Coverage
1: /**
2: * Copyright (C) 2022 Czech Technical University in Prague
3: * <p>
4: * This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
5: * License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
6: * version.
7: * <p>
8: * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
9: * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
10: * details. You should have received a copy of the GNU General Public License along with this program. If not, see
11: * <http://www.gnu.org/licenses/>.
12: */
13: package cz.cvut.kbss.ontodriver.rdf4j.connector;
14:
15: import cz.cvut.kbss.ontodriver.config.DriverConfiguration;
16: import cz.cvut.kbss.ontodriver.rdf4j.config.Rdf4jConfigParam;
17: import cz.cvut.kbss.ontodriver.rdf4j.exception.Rdf4jDriverException;
18: import cz.cvut.kbss.ontodriver.rdf4j.exception.RepositoryCreationException;
19: import cz.cvut.kbss.ontodriver.rdf4j.exception.RepositoryNotFoundException;
20: import org.eclipse.rdf4j.model.Model;
21: import org.eclipse.rdf4j.model.Resource;
22: import org.eclipse.rdf4j.model.vocabulary.RDF;
23: import org.eclipse.rdf4j.repository.Repository;
24: import org.eclipse.rdf4j.repository.RepositoryException;
25: import org.eclipse.rdf4j.repository.config.RepositoryConfig;
26: import org.eclipse.rdf4j.repository.config.RepositoryConfigException;
27: import org.eclipse.rdf4j.repository.config.RepositoryConfigSchema;
28: import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
29: import org.eclipse.rdf4j.repository.manager.RepositoryManager;
30: import org.eclipse.rdf4j.repository.manager.RepositoryProvider;
31: import org.eclipse.rdf4j.repository.sail.SailRepository;
32: import org.eclipse.rdf4j.repository.sail.config.SailRepositoryConfig;
33: import org.eclipse.rdf4j.rio.RDFFormat;
34: import org.eclipse.rdf4j.rio.Rio;
35: import org.eclipse.rdf4j.sail.config.SailImplConfig;
36: import org.eclipse.rdf4j.sail.inferencer.fc.SchemaCachingRDFSInferencer;
37: import org.eclipse.rdf4j.sail.inferencer.fc.config.SchemaCachingRDFSInferencerConfig;
38: import org.eclipse.rdf4j.sail.memory.MemoryStore;
39: import org.eclipse.rdf4j.sail.nativerdf.config.NativeStoreConfig;
40: import org.slf4j.Logger;
41: import org.slf4j.LoggerFactory;
42:
43: import java.io.FileInputStream;
44: import java.io.FileNotFoundException;
45: import java.io.IOException;
46: import java.io.InputStream;
47: import java.net.URI;
48: import java.util.Optional;
49: import java.util.Set;
50:
51: class RepositoryConnectorInitializer {
52:
53: private static final Logger LOG = LoggerFactory.getLogger(RepositoryConnectorInitializer.class);
54:
55: private static final String[] KNOWN_REMOTE_SCHEMES = {"http", "https", "ftp"};
56: private static final String LOCAL_NATIVE_REPO = "repositories/";
57: private static final String FILE_SCHEME = "file";
58: private static final String CLASSPATH_PREFIX = "classpath:";
59:
60: private final DriverConfiguration configuration;
61: private final int maxReconnectAttempts;
62:
63: private RepositoryManager manager;
64: private Repository repository;
65:
66: RepositoryConnectorInitializer(DriverConfiguration configuration, int maxReconnectAttempts) {
67: this.configuration = configuration;
68: this.maxReconnectAttempts = maxReconnectAttempts;
69: }
70:
71: void initializeRepository() throws Rdf4jDriverException {
72: final URI serverUri = configuration.getStorageProperties().getPhysicalURI();
73: LOG.debug("Initializing connector to repository at {}", serverUri);
74: try {
75: final boolean isRemote = isRemoteRepository(serverUri);
76:• if (isRemote) {
77: this.repository = connectToRemoteRepository(serverUri.toString());
78: } else {
79: this.repository = createLocalRepository();
80: }
81: verifyRepositoryCreated(serverUri, isRemote);
82: repository.init();
83: } catch (RepositoryException | RepositoryConfigException e) {
84: throw new Rdf4jDriverException("Failed to acquire RDF4J repository connection.", e);
85: }
86: }
87:
88: private static boolean isRemoteRepository(URI uri) {
89: final String scheme = uri.getScheme();
90:• for (String s : KNOWN_REMOTE_SCHEMES) {
91:• if (s.equals(scheme)) {
92: return true;
93: }
94: }
95: return false;
96: }
97:
98: private Repository connectToRemoteRepository(String repoUri) {
99: this.manager = RepositoryProvider.getRepositoryManagerOfRepository(repoUri);
100: final RemoteRepositoryManager remoteManager = (RemoteRepositoryManager) manager;
101: final String username = configuration.getStorageProperties().getUsername();
102:• if (username != null) {
103: final String password = configuration.getStorageProperties().getPassword();
104: remoteManager.setUsernameAndPassword(username, password);
105: }
106: return connectToRemote(repoUri, 1);
107: }
108:
109: private Repository connectToRemote(String repoUri, int attempts) {
110: try {
111: return manager.getRepository(RepositoryProvider.getRepositoryIdOfRepository(repoUri));
112: } catch (RepositoryException e) {
113:• if (attempts < maxReconnectAttempts) {
114: LOG.warn("Unable to connect to repository {}. Error is: {}. Retrying...", repoUri, e.getMessage());
115: return connectToRemote(repoUri, attempts + 1);
116: }
117: LOG.error("Threshold of failed connection attempts reached, throwing exception.");
118: throw e;
119: }
120: }
121:
122: private Repository createLocalRepository() {
123:• if (configuration.isSet(Rdf4jConfigParam.REPOSITORY_CONFIG)) {
124: return createRepositoryFromConfig();
125: }
126: final URI localUri = configuration.getStorageProperties().getPhysicalURI();
127:• if (!isFileUri(localUri) && configuration.is(Rdf4jConfigParam.USE_VOLATILE_STORAGE)) {
128: return createInMemoryRepository();
129: } else {
130: return createNativeRepository(configuration, localUri.toString());
131: }
132: }
133:
134: private Repository createRepositoryFromConfig() {
135: LOG.trace("Creating local repository from repository config file.");
136: final RepositoryConfig repoConfig = loadRepositoryConfig();
137: this.manager = RepositoryProvider.getRepositoryManager(getRepositoryManagerBaseDir().orElse(""));
138: manager.addRepositoryConfig(repoConfig);
139: return manager.getRepository(getRepositoryId());
140: }
141:
142: private RepositoryConfig loadRepositoryConfig() {
143: try (final InputStream is = getConfigFileContent()) {
144: final Model configModel = Rio.parse(is, "", RDFFormat.TURTLE);
145: final Set<Resource> resources =
146: configModel.filter(null, RDF.TYPE, RepositoryConfigSchema.REPOSITORY).subjects();
147:• assert resources.size() == 1;
148: return RepositoryConfig.create(configModel, resources.iterator().next());
149: } catch (IOException e) {
150: throw new RepositoryCreationException("Unable to create repository from the specified configuration.", e);
151: }
152: }
153:
154: private InputStream getConfigFileContent() {
155: final String configPath = configuration.getProperty(Rdf4jConfigParam.REPOSITORY_CONFIG);
156: LOG.trace("Loading repository configuration file content from {}.", configPath);
157:• if (configPath.startsWith(CLASSPATH_PREFIX)) {
158: final InputStream is =
159: getClass().getClassLoader().getResourceAsStream(configPath.substring(CLASSPATH_PREFIX.length()));
160:• if (is == null) {
161: throw new RepositoryCreationException(
162: "Unable to find repository configuration file on classpath location " + configPath);
163: }
164: return is;
165: } else {
166: try {
167: return new FileInputStream(configPath);
168: } catch (FileNotFoundException e) {
169: throw new RepositoryCreationException("Unable to find repository configuration file at " + configPath,
170: e);
171: }
172: }
173: }
174:
175: private Optional<String> getRepositoryManagerBaseDir() {
176: final String physicalUri = configuration.getStorageProperties().getPhysicalURI().toString();
177: final String[] tmp = physicalUri.split(LOCAL_NATIVE_REPO);
178:• return tmp.length == 2 ? Optional.of(tmp[0]) : Optional.empty();
179: }
180:
181: private String getRepositoryId() {
182: final String physicalUri = configuration.getStorageProperties().getPhysicalURI().toString();
183: final String[] tmp = physicalUri.split(LOCAL_NATIVE_REPO);
184:• if (tmp.length != 2) {
185: return physicalUri;
186: }
187: String repoId = tmp[1];
188: // Get rid of the trailing slash if necessary
189:• return repoId.charAt(repoId.length() - 1) == '/' ? repoId.substring(0, repoId.length() - 1) : repoId;
190: }
191:
192: private static boolean isFileUri(URI uri) {
193:• return uri.getScheme() != null && uri.getScheme().equals(FILE_SCHEME);
194: }
195:
196: /**
197: * Creates a local in-memory RDF4J repository which is disposed of when the VM shuts down.
198: */
199: private Repository createInMemoryRepository() {
200: LOG.trace("Creating local in-memory repository.");
201: final MemoryStore ms = new MemoryStore();
202:• if (configuration.is(Rdf4jConfigParam.USE_INFERENCE)) {
203: return new SailRepository(new SchemaCachingRDFSInferencer(ms));
204: } else {
205: return new SailRepository(ms);
206: }
207: }
208:
209: /**
210: * Creates native repository.
211: * <p>
212: * This kind of repository stores data in files and is persistent after the VM shuts down.
213: */
214: private Repository createNativeRepository(DriverConfiguration configuration, String localUri) {
215: LOG.trace("Creating local native repository at " + localUri);
216: validateNativeStorePath(localUri);
217: try {
218: this.manager = RepositoryProvider.getRepositoryManagerOfRepository(localUri);
219: final String repoId = getRepositoryId();
220: final RepositoryConfig cfg = createLocalNativeRepositoryConfig(repoId, configuration);
221: manager.addRepositoryConfig(cfg);
222: return manager.getRepository(repoId);
223: } catch (RepositoryConfigException | RepositoryException e) {
224: throw new RepositoryCreationException("Unable to create local repository at " + localUri, e);
225: }
226: }
227:
228: private static void validateNativeStorePath(String path) {
229:• if (path.split(LOCAL_NATIVE_REPO).length != 2) {
230: throw new RepositoryCreationException(
231: "Unsupported local RDF4J repository path. Expected file://path/repositories/id but got " +
232: path);
233: }
234: }
235:
236: private static RepositoryConfig createLocalNativeRepositoryConfig(String repoId,
237: DriverConfiguration configuration) {
238: SailImplConfig backend = new NativeStoreConfig();
239:• if (configuration.is(Rdf4jConfigParam.USE_INFERENCE)) {
240: backend = new SchemaCachingRDFSInferencerConfig(backend);
241: }
242: final SailRepositoryConfig repoType = new SailRepositoryConfig(backend);
243: return new RepositoryConfig(repoId, repoType);
244: }
245:
246: private void verifyRepositoryCreated(URI serverUri, boolean isRemote) {
247:• if (repository == null) {
248:• if (isRemote) {
249: throw new RepositoryNotFoundException("Unable to reach repository at " + serverUri);
250: } else {
251: throw new RepositoryCreationException("Unable to create local repository at " + serverUri);
252: }
253: }
254: }
255:
256: RepositoryManager getManager() {
257: return manager;
258: }
259:
260: Repository getRepository() {
261: return repository;
262: }
263: }