1 package eu.fbk.dkm.pikes.kv;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.Closeable;
6 import java.io.File;
7 import java.io.IOException;
8 import java.util.Arrays;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Objects;
12 import java.util.Set;
13 import java.util.concurrent.atomic.AtomicReference;
14 import java.util.logging.LogManager;
15
16 import javax.annotation.Nullable;
17
18 import com.google.common.base.Charsets;
19 import com.google.common.base.Joiner;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Splitter;
22 import com.google.common.base.Throwables;
23 import com.google.common.collect.BiMap;
24 import com.google.common.collect.ImmutableBiMap;
25 import com.google.common.collect.Maps;
26 import com.google.common.collect.Ordering;
27 import com.google.common.collect.Sets;
28 import com.google.common.io.ByteStreams;
29 import com.google.common.io.Files;
30 import com.spotify.sparkey.CompressionType;
31 import com.spotify.sparkey.Sparkey;
32 import com.spotify.sparkey.SparkeyReader;
33 import com.spotify.sparkey.SparkeyWriter;
34
35 import org.eclipse.rdf4j.model.BNode;
36 import org.eclipse.rdf4j.model.Literal;
37 import org.eclipse.rdf4j.model.Resource;
38 import org.eclipse.rdf4j.model.Statement;
39 import org.eclipse.rdf4j.model.IRI;
40 import org.eclipse.rdf4j.model.Value;
41 import org.eclipse.rdf4j.model.ValueFactory;
42 import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
43 import org.eclipse.rdf4j.rio.RDFHandler;
44 import org.eclipse.rdf4j.rio.RDFHandlerException;
45 import org.eclipse.rdf4j.rio.Rio;
46 import org.slf4j.bridge.SLF4JBridgeHandler;
47
48 import eu.fbk.utils.core.CommandLine;
49 import eu.fbk.rdfpro.AbstractRDFHandlerWrapper;
50 import eu.fbk.rdfpro.Mapper;
51 import eu.fbk.rdfpro.RDFHandlers;
52 import eu.fbk.rdfpro.RDFProcessors;
53 import eu.fbk.rdfpro.RDFSources;
54 import eu.fbk.rdfpro.Reducer;
55 import eu.fbk.rdfpro.util.StatementComponent;
56 import eu.fbk.rdfpro.util.Statements;
57
58 public final class KeyQuadIndex implements KeyQuadSource, Closeable {
59
60 private static final byte[] NS_KEY = new byte[] {};
61
62 private static final int HI_END = 0;
63
64 private static final int HI_END_S = 1 << 5;
65
66 private static final int HI_END_P = 2 << 5;
67
68 private static final int HI_END_C = 3 << 5;
69
70 private static final int HI_IRI = 4 << 5;
71
72 private static final int HI_LITERAL = 5 << 5;
73
74 private static final int HI_BNODE = 6 << 5;
75
76 private static final int HI_NULL = 7 << 5;
77
78 private static final BiMap<IRI, Integer> DT_MAP;
79
80 private final SparkeyReader reader;
81
82 private final Map<String, Integer> nsMap;
83
84 private final String[] nsArray;
85
86 static {
87 final ImmutableBiMap.Builder<IRI, Integer> builder = ImmutableBiMap.builder();
88 int index = 0;
89 for (final IRI dt : new IRI[] { XMLSchema.DURATION, XMLSchema.DATETIME,
90 XMLSchema.DAYTIMEDURATION, XMLSchema.TIME, XMLSchema.DATE, XMLSchema.GYEARMONTH,
91 XMLSchema.GYEAR, XMLSchema.GMONTHDAY, XMLSchema.GDAY, XMLSchema.GMONTH,
92 XMLSchema.STRING, XMLSchema.BOOLEAN, XMLSchema.BASE64BINARY, XMLSchema.HEXBINARY,
93 XMLSchema.FLOAT, XMLSchema.DECIMAL, XMLSchema.DOUBLE, XMLSchema.ANYURI,
94 XMLSchema.QNAME, XMLSchema.NOTATION, XMLSchema.NORMALIZEDSTRING, XMLSchema.TOKEN,
95 XMLSchema.LANGUAGE, XMLSchema.NMTOKEN, XMLSchema.NMTOKENS, XMLSchema.NAME,
96 XMLSchema.NCNAME, XMLSchema.ID, XMLSchema.IDREF, XMLSchema.IDREFS,
97 XMLSchema.ENTITY, XMLSchema.ENTITIES, XMLSchema.INTEGER, XMLSchema.LONG,
98 XMLSchema.INT, XMLSchema.SHORT, XMLSchema.BYTE, XMLSchema.NON_POSITIVE_INTEGER,
99 XMLSchema.NEGATIVE_INTEGER, XMLSchema.NON_NEGATIVE_INTEGER,
100 XMLSchema.POSITIVE_INTEGER, XMLSchema.UNSIGNED_LONG, XMLSchema.UNSIGNED_INT,
101 XMLSchema.UNSIGNED_SHORT, XMLSchema.UNSIGNED_BYTE }) {
102 builder.put(dt, index++);
103 }
104 DT_MAP = builder.build();
105 }
106
107 public KeyQuadIndex(final File file) {
108 try {
109 this.reader = Sparkey.open(file);
110 this.nsArray = Splitter.on('\n')
111 .splitToList(new String(this.reader.getAsByteArray(NS_KEY), Charsets.UTF_8))
112 .toArray(new String[0]);
113 this.nsMap = Maps.newHashMap();
114 for (int i = 0; i < this.nsArray.length; ++i) {
115 this.nsMap.put(this.nsArray[i], i);
116 }
117 } catch (final IOException ex) {
118 throw Throwables.propagate(ex);
119 }
120 }
121
122 @Override
123 public boolean get(final Value key, final RDFHandler handler) throws RDFHandlerException {
124 try {
125 final byte[] keyBytes = write(this.nsMap, new ByteArrayOutputStream(), key)
126 .toByteArray();
127 final byte[] valueBytes = this.reader.getAsByteArray(keyBytes);
128 if (valueBytes == null) {
129 return false;
130 } else {
131 read(this.nsArray, new ByteArrayInputStream(valueBytes), handler);
132 return true;
133 }
134 } catch (final IOException ex) {
135 throw Throwables.propagate(ex);
136 }
137 }
138
139 @Override
140 public void close() {
141 this.reader.close();
142 }
143
144 @Nullable
145 private static Value read(final String[] nsArray, final ByteArrayInputStream stream) {
146
147 try {
148 final ValueFactory vf = Statements.VALUE_FACTORY;
149 final int b = stream.read();
150 final int hi = b & 0xE0;
151
152 if (hi == HI_NULL) {
153 return null;
154
155 } else if (hi == HI_BNODE) {
156 final byte[] id = new byte[(b & 0x1F) << 8 | stream.read()];
157 ByteStreams.readFully(stream, id);
158 return vf.createBNode(new String(id, Charsets.UTF_8));
159
160 } else if (hi == HI_IRI) {
161 if ((b & 0x10) != 0) {
162 final byte[] name = new byte[(b & 0xF) << 8 | stream.read()];
163 final String ns = nsArray[stream.read()];
164 ByteStreams.readFully(stream, name);
165 return vf.createIRI(ns, new String(name, Charsets.UTF_8));
166 } else {
167 final byte[] str = new byte[(b & 0xF) << 8 | stream.read()];
168 ByteStreams.readFully(stream, str);
169 return vf.createIRI(new String(str, Charsets.UTF_8));
170 }
171
172 } else if (hi == HI_LITERAL) {
173 byte[] lang = null;
174 IRI dt = null;
175 if ((b & 0x10) != 0) {
176 lang = new byte[b & 0xF];
177 ByteStreams.readFully(stream, lang);
178 } else if ((b & 0x1) != 0) {
179 dt = DT_MAP.inverse().get(stream.read());
180 } else if ((b & 0x2) != 0) {
181 dt = (IRI) read(nsArray, stream);
182 }
183 final byte[] label = new byte[stream.read() << 16 | stream.read() << 8
184 | stream.read()];
185 ByteStreams.readFully(stream, label);
186 final String labelStr = new String(label, Charsets.UTF_8);
187 if (lang != null) {
188 return vf.createLiteral(labelStr, new String(lang, Charsets.UTF_8));
189 } else if (dt != null) {
190 return vf.createLiteral(labelStr, dt);
191 } else {
192 return vf.createLiteral(labelStr);
193 }
194
195 } else {
196 throw new Error("Invalid marker: " + b);
197 }
198
199 } catch (final IOException ex) {
200 throw new Error(ex);
201 }
202 }
203
204 private static ByteArrayOutputStream write(final Map<String, Integer> nsMap,
205 final ByteArrayOutputStream stream, final Value value) {
206
207 if (value == null) {
208 stream.write(HI_NULL);
209
210 } else if (value instanceof BNode) {
211 final byte[] id = ((BNode) value).getID().getBytes(Charsets.UTF_8);
212 Preconditions.checkArgument(id.length <= 0x1FFF);
213 stream.write(HI_BNODE | id.length >>> 8);
214 stream.write(id.length & 0xFF);
215 stream.write(id, 0, id.length);
216
217 } else if (value instanceof IRI) {
218 final IRI uri = (IRI) value;
219 final Integer nsID = nsMap.get(uri.getNamespace());
220 if (nsID != null && nsID <= 0xFF) {
221 final byte[] name = uri.getLocalName().getBytes(Charsets.UTF_8);
222 Preconditions.checkArgument(name.length <= 0xFFF);
223 stream.write(HI_IRI | 0x10 | name.length >>> 8);
224 stream.write(name.length & 0xFF);
225 stream.write(nsID);
226 stream.write(name, 0, name.length);
227 } else {
228 final byte[] str = uri.stringValue().getBytes(Charsets.UTF_8);
229 Preconditions.checkArgument(str.length <= 0xFFF);
230 stream.write(HI_IRI | str.length >>> 8);
231 stream.write(str.length & 0xFF);
232 stream.write(str, 0, str.length);
233 }
234
235 } else if (value instanceof Literal) {
236 final Literal lit = (Literal) value;
237 if (lit.getLanguage().isPresent()) {
238 final byte[] lang = lit.getLanguage().get().getBytes(Charsets.UTF_8);
239 Preconditions.checkArgument(lang.length <= 0x0F);
240 stream.write(HI_LITERAL | 0x10 | lang.length);
241 stream.write(lang, 0, lang.length);
242 } else if (lit.getDatatype().equals(XMLSchema.STRING)) {
243 stream.write(HI_LITERAL);
244 } else {
245 final Integer dtID = DT_MAP.get(lit.getDatatype());
246 if (dtID != null) {
247 stream.write(HI_LITERAL | 0x01);
248 stream.write(dtID);
249 } else {
250 stream.write(HI_LITERAL | 0x2);
251 write(nsMap, stream, lit.getDatatype());
252 }
253 }
254 final byte[] label = lit.getLabel().getBytes(Charsets.UTF_8);
255 Preconditions.checkArgument(label.length < 0xFFFFFF);
256 stream.write(label.length >>> 16);
257 stream.write(label.length >>> 8 & 0xFF);
258 stream.write(label.length & 0xFF);
259 stream.write(label, 0, label.length);
260 }
261
262 return stream;
263 }
264
265 private static void read(final String[] nsArray, final ByteArrayInputStream stream,
266 final RDFHandler handler) throws RDFHandlerException {
267 final Value[] values = new Value[4];
268 int index = 0;
269 while (true) {
270 stream.mark(1);
271 final int hi = stream.read() & 0xFF;
272 if (hi == HI_END) {
273 break;
274 } else if (hi == HI_END_C) {
275 index = 0;
276 } else if (hi == HI_END_S) {
277 index = 1;
278 } else if (hi == HI_END_P) {
279 index = 2;
280 } else {
281 stream.reset();
282 }
283 values[index++] = read(nsArray, stream);
284 if (index == 4) {
285 handler.handleStatement(Statements.VALUE_FACTORY.createStatement(
286 (Resource) values[1], (IRI) values[2], values[3], (Resource) values[0]));
287 --index;
288 }
289 }
290 }
291
292 private static ByteArrayOutputStream write(final Map<String, Integer> nsMap,
293 final ByteArrayOutputStream stream, final Iterable<Statement> stmts) {
294 Statement lastStmt = null;
295 for (final Statement stmt : Ordering.from(
296 Statements.statementComparator("cspo", Statements.valueComparator())).sortedCopy(
297 stmts)) {
298 final boolean sameC = lastStmt != null
299 && Objects.equals(lastStmt.getContext(), stmt.getContext());
300 final boolean sameS = sameC && lastStmt.getSubject().equals(stmt.getSubject());
301 final boolean sameP = sameS && lastStmt.getPredicate().equals(stmt.getPredicate());
302 if (sameP) {
303 write(nsMap, stream, stmt.getObject());
304 } else if (sameS) {
305 stream.write(HI_END_P);
306 write(nsMap, stream, stmt.getPredicate());
307 write(nsMap, stream, stmt.getObject());
308 } else if (sameC) {
309 stream.write(HI_END_S);
310 write(nsMap, stream, stmt.getSubject());
311 write(nsMap, stream, stmt.getPredicate());
312 write(nsMap, stream, stmt.getObject());
313 } else {
314 if (lastStmt != null) {
315 stream.write(HI_END_C);
316 }
317 write(nsMap, stream, stmt.getContext());
318 write(nsMap, stream, stmt.getSubject());
319 write(nsMap, stream, stmt.getPredicate());
320 write(nsMap, stream, stmt.getObject());
321 }
322 lastStmt = stmt;
323 }
324 stream.write(HI_END);
325 return stream;
326 }
327
328 public static RDFHandler indexer(final File file, final StatementComponent component) {
329
330 Objects.requireNonNull(file);
331 Objects.requireNonNull(component);
332
333 final Map<String, Integer> nsMap = Maps.newHashMap();
334
335 final AtomicReference<SparkeyWriter> writerRef = new AtomicReference<SparkeyWriter>();
336 final Reducer reducer = new Reducer() {
337
338 @Override
339 public void reduce(final Value key, final Statement[] stmts, final RDFHandler handler)
340 throws RDFHandlerException {
341 final byte[] keyBytes = write(nsMap, new ByteArrayOutputStream(), key)
342 .toByteArray();
343 final byte[] stmtsBytes = write(nsMap, new ByteArrayOutputStream(),
344 Arrays.asList(stmts)).toByteArray();
345 synchronized (writerRef) {
346 try {
347 writerRef.get().put(keyBytes, stmtsBytes);
348 } catch (final Throwable ex) {
349 throw new RDFHandlerException(ex);
350 }
351 }
352 }
353
354 };
355
356 final RDFHandler mrHandler = RDFProcessors.mapReduce(
357 Mapper.select("" + component.getLetter()), reducer, true).wrap(RDFHandlers.NIL);
358
359 return new AbstractRDFHandlerWrapper(mrHandler) {
360
361 private final Set<String> namespaces = Sets.newConcurrentHashSet();
362
363 @Override
364 public void startRDF() throws RDFHandlerException {
365 super.startRDF();
366 }
367
368 @Override
369 public void handleStatement(final Statement stmt) throws RDFHandlerException {
370 this.namespaces.add(stmt.getPredicate().getNamespace());
371 if (stmt.getSubject() instanceof IRI) {
372 this.namespaces.add(((IRI) stmt.getSubject()).getNamespace());
373 }
374 if (stmt.getObject() instanceof IRI) {
375 this.namespaces.add(((IRI) stmt.getObject()).getNamespace());
376 }
377 if (stmt.getContext() instanceof IRI) {
378 this.namespaces.add(((IRI) stmt.getContext()).getNamespace());
379 }
380 super.handleStatement(stmt);
381 }
382
383 @Override
384 public void endRDF() throws RDFHandlerException {
385
386 int counter = 0;
387 final List<String> nsList = Ordering.natural()
388 .immutableSortedCopy(this.namespaces);
389 for (final String namespace : nsList) {
390 nsMap.put(namespace, counter++);
391 }
392
393 try {
394 writerRef.set(Sparkey.createNew(file, CompressionType.SNAPPY, 4096));
395 writerRef.get().put(NS_KEY,
396 Joiner.on('\n').join(nsList).getBytes(Charsets.UTF_8));
397
398 super.endRDF();
399
400 writerRef.get().flush();
401 writerRef.get().writeHash();
402 writerRef.get().close();
403 } catch (final IOException ex) {
404 throw new RDFHandlerException(ex);
405 }
406 }
407
408 };
409 }
410
411 public static void main(final String... args) {
412 try {
413 LogManager.getLogManager().reset();
414 SLF4JBridgeHandler.removeHandlersForRootLogger();
415 SLF4JBridgeHandler.install();
416
417
418 final CommandLine cmd = CommandLine
419 .parser()
420 .withName("kv-index")
421 .withOption("c", "component",
422 "the component (s,p,o,c) to use for partitioning quads (default: s)",
423 "COMP", CommandLine.Type.STRING, true, false, false)
424 .withOption("r", "recursive", "whether to recurse into input directories")
425 .withOption("o", "output", "output file name", "FILE", CommandLine.Type.FILE,
426 true, false, true)
427 .withHeader(
428 "Read RDF quads, split them into partitions by component "
429 + "and index the partitions in a binary file for fast lookup")
430 .parse(args);
431
432
433 final StatementComponent component = StatementComponent.forLetter(cmd.getOptionValue(
434 "c", String.class, "s").charAt(0));
435 final boolean recursive = cmd.hasOption("r");
436 final File output = cmd.getOptionValue("o", File.class);
437 final List<File> files = cmd.getArgs(File.class);
438
439
440 final Set<String> locations = Sets.newHashSet();
441 for (final File file : files) {
442 locations.add(file.getAbsolutePath());
443 if (recursive && file.isDirectory()) {
444 for (final File child : Files.fileTreeTraverser().preOrderTraversal(file)) {
445 if (Rio.getParserFormatForFileName(file.getAbsolutePath()) != null) {
446 locations.add(child.getAbsolutePath());
447 }
448 }
449 }
450 }
451
452
453 final RDFHandler indexer = indexer(output, component);
454
455
456 RDFProcessors.read(true, true, null, null,null,true,
457 locations.toArray(new String[locations.size()])).apply(RDFSources.NIL,
458 indexer, 1);
459
460 } catch (final Throwable ex) {
461
462 CommandLine.fail(ex);
463 }
464 }
465
466 }