1   package eu.fbk.dkm.pikes.kv;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.Closeable;
6   import java.io.File;
7   import java.io.IOException;
8   import java.util.Arrays;
9   import java.util.List;
10  import java.util.Map;
11  import java.util.Objects;
12  import java.util.Set;
13  import java.util.concurrent.atomic.AtomicReference;
14  import java.util.logging.LogManager;
15  
16  import javax.annotation.Nullable;
17  
18  import com.google.common.base.Charsets;
19  import com.google.common.base.Joiner;
20  import com.google.common.base.Preconditions;
21  import com.google.common.base.Splitter;
22  import com.google.common.base.Throwables;
23  import com.google.common.collect.BiMap;
24  import com.google.common.collect.ImmutableBiMap;
25  import com.google.common.collect.Maps;
26  import com.google.common.collect.Ordering;
27  import com.google.common.collect.Sets;
28  import com.google.common.io.ByteStreams;
29  import com.google.common.io.Files;
30  import com.spotify.sparkey.CompressionType;
31  import com.spotify.sparkey.Sparkey;
32  import com.spotify.sparkey.SparkeyReader;
33  import com.spotify.sparkey.SparkeyWriter;
34  
35  import org.eclipse.rdf4j.model.BNode;
36  import org.eclipse.rdf4j.model.Literal;
37  import org.eclipse.rdf4j.model.Resource;
38  import org.eclipse.rdf4j.model.Statement;
39  import org.eclipse.rdf4j.model.IRI;
40  import org.eclipse.rdf4j.model.Value;
41  import org.eclipse.rdf4j.model.ValueFactory;
42  import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
43  import org.eclipse.rdf4j.rio.RDFHandler;
44  import org.eclipse.rdf4j.rio.RDFHandlerException;
45  import org.eclipse.rdf4j.rio.Rio;
46  import org.slf4j.bridge.SLF4JBridgeHandler;
47  
48  import eu.fbk.utils.core.CommandLine;
49  import eu.fbk.rdfpro.AbstractRDFHandlerWrapper;
50  import eu.fbk.rdfpro.Mapper;
51  import eu.fbk.rdfpro.RDFHandlers;
52  import eu.fbk.rdfpro.RDFProcessors;
53  import eu.fbk.rdfpro.RDFSources;
54  import eu.fbk.rdfpro.Reducer;
55  import eu.fbk.rdfpro.util.StatementComponent;
56  import eu.fbk.rdfpro.util.Statements;
57  
58  public final class KeyQuadIndex implements KeyQuadSource, Closeable {
59  
60      private static final byte[] NS_KEY = new byte[] {};
61  
62      private static final int HI_END = 0;
63  
64      private static final int HI_END_S = 1 << 5;
65  
66      private static final int HI_END_P = 2 << 5;
67  
68      private static final int HI_END_C = 3 << 5;
69  
70      private static final int HI_IRI = 4 << 5;
71  
72      private static final int HI_LITERAL = 5 << 5;
73  
74      private static final int HI_BNODE = 6 << 5;
75  
76      private static final int HI_NULL = 7 << 5;
77  
78      private static final BiMap<IRI, Integer> DT_MAP;
79  
80      private final SparkeyReader reader;
81  
82      private final Map<String, Integer> nsMap;
83  
84      private final String[] nsArray;
85  
86      static {
87          final ImmutableBiMap.Builder<IRI, Integer> builder = ImmutableBiMap.builder();
88          int index = 0;
89          for (final IRI dt : new IRI[] { XMLSchema.DURATION, XMLSchema.DATETIME,
90                  XMLSchema.DAYTIMEDURATION, XMLSchema.TIME, XMLSchema.DATE, XMLSchema.GYEARMONTH,
91                  XMLSchema.GYEAR, XMLSchema.GMONTHDAY, XMLSchema.GDAY, XMLSchema.GMONTH,
92                  XMLSchema.STRING, XMLSchema.BOOLEAN, XMLSchema.BASE64BINARY, XMLSchema.HEXBINARY,
93                  XMLSchema.FLOAT, XMLSchema.DECIMAL, XMLSchema.DOUBLE, XMLSchema.ANYURI,
94                  XMLSchema.QNAME, XMLSchema.NOTATION, XMLSchema.NORMALIZEDSTRING, XMLSchema.TOKEN,
95                  XMLSchema.LANGUAGE, XMLSchema.NMTOKEN, XMLSchema.NMTOKENS, XMLSchema.NAME,
96                  XMLSchema.NCNAME, XMLSchema.ID, XMLSchema.IDREF, XMLSchema.IDREFS,
97                  XMLSchema.ENTITY, XMLSchema.ENTITIES, XMLSchema.INTEGER, XMLSchema.LONG,
98                  XMLSchema.INT, XMLSchema.SHORT, XMLSchema.BYTE, XMLSchema.NON_POSITIVE_INTEGER,
99                  XMLSchema.NEGATIVE_INTEGER, XMLSchema.NON_NEGATIVE_INTEGER,
100                 XMLSchema.POSITIVE_INTEGER, XMLSchema.UNSIGNED_LONG, XMLSchema.UNSIGNED_INT,
101                 XMLSchema.UNSIGNED_SHORT, XMLSchema.UNSIGNED_BYTE }) {
102             builder.put(dt, index++);
103         }
104         DT_MAP = builder.build();
105     }
106 
107     public KeyQuadIndex(final File file) {
108         try {
109             this.reader = Sparkey.open(file);
110             this.nsArray = Splitter.on('\n')
111                     .splitToList(new String(this.reader.getAsByteArray(NS_KEY), Charsets.UTF_8))
112                     .toArray(new String[0]);
113             this.nsMap = Maps.newHashMap();
114             for (int i = 0; i < this.nsArray.length; ++i) {
115                 this.nsMap.put(this.nsArray[i], i);
116             }
117         } catch (final IOException ex) {
118             throw Throwables.propagate(ex);
119         }
120     }
121 
122     @Override
123     public boolean get(final Value key, final RDFHandler handler) throws RDFHandlerException {
124         try {
125             final byte[] keyBytes = write(this.nsMap, new ByteArrayOutputStream(), key)
126                     .toByteArray();
127             final byte[] valueBytes = this.reader.getAsByteArray(keyBytes);
128             if (valueBytes == null) {
129                 return false;
130             } else {
131                 read(this.nsArray, new ByteArrayInputStream(valueBytes), handler);
132                 return true;
133             }
134         } catch (final IOException ex) {
135             throw Throwables.propagate(ex);
136         }
137     }
138 
139     @Override
140     public void close() {
141         this.reader.close();
142     }
143 
144     @Nullable
145     private static Value read(final String[] nsArray, final ByteArrayInputStream stream) {
146 
147         try {
148             final ValueFactory vf = Statements.VALUE_FACTORY;
149             final int b = stream.read();
150             final int hi = b & 0xE0;
151 
152             if (hi == HI_NULL) {
153                 return null;
154 
155             } else if (hi == HI_BNODE) {
156                 final byte[] id = new byte[(b & 0x1F) << 8 | stream.read()];
157                 ByteStreams.readFully(stream, id);
158                 return vf.createBNode(new String(id, Charsets.UTF_8));
159 
160             } else if (hi == HI_IRI) {
161                 if ((b & 0x10) != 0) {
162                     final byte[] name = new byte[(b & 0xF) << 8 | stream.read()];
163                     final String ns = nsArray[stream.read()];
164                     ByteStreams.readFully(stream, name);
165                     return vf.createIRI(ns, new String(name, Charsets.UTF_8));
166                 } else {
167                     final byte[] str = new byte[(b & 0xF) << 8 | stream.read()];
168                     ByteStreams.readFully(stream, str);
169                     return vf.createIRI(new String(str, Charsets.UTF_8));
170                 }
171 
172             } else if (hi == HI_LITERAL) {
173                 byte[] lang = null;
174                 IRI dt = null;
175                 if ((b & 0x10) != 0) {
176                     lang = new byte[b & 0xF];
177                     ByteStreams.readFully(stream, lang);
178                 } else if ((b & 0x1) != 0) {
179                     dt = DT_MAP.inverse().get(stream.read());
180                 } else if ((b & 0x2) != 0) {
181                     dt = (IRI) read(nsArray, stream);
182                 }
183                 final byte[] label = new byte[stream.read() << 16 | stream.read() << 8
184                         | stream.read()];
185                 ByteStreams.readFully(stream, label);
186                 final String labelStr = new String(label, Charsets.UTF_8);
187                 if (lang != null) {
188                     return vf.createLiteral(labelStr, new String(lang, Charsets.UTF_8));
189                 } else if (dt != null) {
190                     return vf.createLiteral(labelStr, dt);
191                 } else {
192                     return vf.createLiteral(labelStr);
193                 }
194 
195             } else {
196                 throw new Error("Invalid marker: " + b);
197             }
198 
199         } catch (final IOException ex) {
200             throw new Error(ex);
201         }
202     }
203 
204     private static ByteArrayOutputStream write(final Map<String, Integer> nsMap,
205             final ByteArrayOutputStream stream, final Value value) {
206 
207         if (value == null) {
208             stream.write(HI_NULL);
209 
210         } else if (value instanceof BNode) {
211             final byte[] id = ((BNode) value).getID().getBytes(Charsets.UTF_8);
212             Preconditions.checkArgument(id.length <= 0x1FFF);
213             stream.write(HI_BNODE | id.length >>> 8);
214             stream.write(id.length & 0xFF);
215             stream.write(id, 0, id.length);
216 
217         } else if (value instanceof IRI) {
218             final IRI uri = (IRI) value;
219             final Integer nsID = nsMap.get(uri.getNamespace());
220             if (nsID != null && nsID <= 0xFF) {
221                 final byte[] name = uri.getLocalName().getBytes(Charsets.UTF_8);
222                 Preconditions.checkArgument(name.length <= 0xFFF);
223                 stream.write(HI_IRI | 0x10 | name.length >>> 8);
224                 stream.write(name.length & 0xFF);
225                 stream.write(nsID);
226                 stream.write(name, 0, name.length);
227             } else {
228                 final byte[] str = uri.stringValue().getBytes(Charsets.UTF_8);
229                 Preconditions.checkArgument(str.length <= 0xFFF);
230                 stream.write(HI_IRI | str.length >>> 8);
231                 stream.write(str.length & 0xFF);
232                 stream.write(str, 0, str.length);
233             }
234 
235         } else if (value instanceof Literal) {
236             final Literal lit = (Literal) value;
237             if (lit.getLanguage().isPresent()) {
238                 final byte[] lang = lit.getLanguage().get().getBytes(Charsets.UTF_8);
239                 Preconditions.checkArgument(lang.length <= 0x0F);
240                 stream.write(HI_LITERAL | 0x10 | lang.length);
241                 stream.write(lang, 0, lang.length);
242             } else if (lit.getDatatype().equals(XMLSchema.STRING)) {
243                 stream.write(HI_LITERAL);
244             } else {
245                 final Integer dtID = DT_MAP.get(lit.getDatatype());
246                 if (dtID != null) {
247                     stream.write(HI_LITERAL | 0x01);
248                     stream.write(dtID);
249                 } else {
250                     stream.write(HI_LITERAL | 0x2);
251                     write(nsMap, stream, lit.getDatatype());
252                 }
253             }
254             final byte[] label = lit.getLabel().getBytes(Charsets.UTF_8);
255             Preconditions.checkArgument(label.length < 0xFFFFFF);
256             stream.write(label.length >>> 16);
257             stream.write(label.length >>> 8 & 0xFF);
258             stream.write(label.length & 0xFF);
259             stream.write(label, 0, label.length);
260         }
261 
262         return stream;
263     }
264 
265     private static void read(final String[] nsArray, final ByteArrayInputStream stream,
266             final RDFHandler handler) throws RDFHandlerException {
267         final Value[] values = new Value[4];
268         int index = 0;
269         while (true) {
270             stream.mark(1);
271             final int hi = stream.read() & 0xFF;
272             if (hi == HI_END) {
273                 break;
274             } else if (hi == HI_END_C) {
275                 index = 0;
276             } else if (hi == HI_END_S) {
277                 index = 1;
278             } else if (hi == HI_END_P) {
279                 index = 2;
280             } else {
281                 stream.reset();
282             }
283             values[index++] = read(nsArray, stream);
284             if (index == 4) {
285                 handler.handleStatement(Statements.VALUE_FACTORY.createStatement(
286                         (Resource) values[1], (IRI) values[2], values[3], (Resource) values[0]));
287                 --index;
288             }
289         }
290     }
291 
292     private static ByteArrayOutputStream write(final Map<String, Integer> nsMap,
293             final ByteArrayOutputStream stream, final Iterable<Statement> stmts) {
294         Statement lastStmt = null;
295         for (final Statement stmt : Ordering.from(
296                 Statements.statementComparator("cspo", Statements.valueComparator())).sortedCopy(
297                 stmts)) {
298             final boolean sameC = lastStmt != null
299                     && Objects.equals(lastStmt.getContext(), stmt.getContext());
300             final boolean sameS = sameC && lastStmt.getSubject().equals(stmt.getSubject());
301             final boolean sameP = sameS && lastStmt.getPredicate().equals(stmt.getPredicate());
302             if (sameP) {
303                 write(nsMap, stream, stmt.getObject());
304             } else if (sameS) {
305                 stream.write(HI_END_P);
306                 write(nsMap, stream, stmt.getPredicate());
307                 write(nsMap, stream, stmt.getObject());
308             } else if (sameC) {
309                 stream.write(HI_END_S);
310                 write(nsMap, stream, stmt.getSubject());
311                 write(nsMap, stream, stmt.getPredicate());
312                 write(nsMap, stream, stmt.getObject());
313             } else {
314                 if (lastStmt != null) {
315                     stream.write(HI_END_C);
316                 }
317                 write(nsMap, stream, stmt.getContext());
318                 write(nsMap, stream, stmt.getSubject());
319                 write(nsMap, stream, stmt.getPredicate());
320                 write(nsMap, stream, stmt.getObject());
321             }
322             lastStmt = stmt;
323         }
324         stream.write(HI_END);
325         return stream;
326     }
327 
328     public static RDFHandler indexer(final File file, final StatementComponent component) {
329 
330         Objects.requireNonNull(file);
331         Objects.requireNonNull(component);
332 
333         final Map<String, Integer> nsMap = Maps.newHashMap();
334 
335         final AtomicReference<SparkeyWriter> writerRef = new AtomicReference<SparkeyWriter>();
336         final Reducer reducer = new Reducer() {
337 
338             @Override
339             public void reduce(final Value key, final Statement[] stmts, final RDFHandler handler)
340                     throws RDFHandlerException {
341                 final byte[] keyBytes = write(nsMap, new ByteArrayOutputStream(), key)
342                         .toByteArray();
343                 final byte[] stmtsBytes = write(nsMap, new ByteArrayOutputStream(),
344                         Arrays.asList(stmts)).toByteArray();
345                 synchronized (writerRef) {
346                     try {
347                         writerRef.get().put(keyBytes, stmtsBytes);
348                     } catch (final Throwable ex) {
349                         throw new RDFHandlerException(ex);
350                     }
351                 }
352             }
353 
354         };
355 
356         final RDFHandler mrHandler = RDFProcessors.mapReduce(
357                 Mapper.select("" + component.getLetter()), reducer, true).wrap(RDFHandlers.NIL);
358 
359         return new AbstractRDFHandlerWrapper(mrHandler) {
360 
361             private final Set<String> namespaces = Sets.newConcurrentHashSet();
362 
363             @Override
364             public void startRDF() throws RDFHandlerException {
365                 super.startRDF();
366             }
367 
368             @Override
369             public void handleStatement(final Statement stmt) throws RDFHandlerException {
370                 this.namespaces.add(stmt.getPredicate().getNamespace());
371                 if (stmt.getSubject() instanceof IRI) {
372                     this.namespaces.add(((IRI) stmt.getSubject()).getNamespace());
373                 }
374                 if (stmt.getObject() instanceof IRI) {
375                     this.namespaces.add(((IRI) stmt.getObject()).getNamespace());
376                 }
377                 if (stmt.getContext() instanceof IRI) {
378                     this.namespaces.add(((IRI) stmt.getContext()).getNamespace());
379                 }
380                 super.handleStatement(stmt);
381             }
382 
383             @Override
384             public void endRDF() throws RDFHandlerException {
385 
386                 int counter = 0;
387                 final List<String> nsList = Ordering.natural()
388                         .immutableSortedCopy(this.namespaces);
389                 for (final String namespace : nsList) {
390                     nsMap.put(namespace, counter++);
391                 }
392 
393                 try {
394                     writerRef.set(Sparkey.createNew(file, CompressionType.SNAPPY, 4096));
395                     writerRef.get().put(NS_KEY,
396                             Joiner.on('\n').join(nsList).getBytes(Charsets.UTF_8));
397 
398                     super.endRDF();
399 
400                     writerRef.get().flush();
401                     writerRef.get().writeHash();
402                     writerRef.get().close();
403                 } catch (final IOException ex) {
404                     throw new RDFHandlerException(ex);
405                 }
406             }
407 
408         };
409     }
410 
411     public static void main(final String... args) {
412         try {
413             LogManager.getLogManager().reset();
414             SLF4JBridgeHandler.removeHandlersForRootLogger();
415             SLF4JBridgeHandler.install();
416 
417             // Parse command line
418             final CommandLine cmd = CommandLine
419                     .parser()
420                     .withName("kv-index")
421                     .withOption("c", "component",
422                             "the component (s,p,o,c) to use for partitioning quads (default: s)",
423                             "COMP", CommandLine.Type.STRING, true, false, false)
424                     .withOption("r", "recursive", "whether to recurse into input directories")
425                     .withOption("o", "output", "output file name", "FILE", CommandLine.Type.FILE,
426                             true, false, true)
427                     .withHeader(
428                             "Read RDF quads, split them into partitions by component "
429                                     + "and index the partitions in a binary file for fast lookup")
430                     .parse(args);
431 
432             // Extract options
433             final StatementComponent component = StatementComponent.forLetter(cmd.getOptionValue(
434                     "c", String.class, "s").charAt(0));
435             final boolean recursive = cmd.hasOption("r");
436             final File output = cmd.getOptionValue("o", File.class);
437             final List<File> files = cmd.getArgs(File.class);
438 
439             // Expand file list if recursive
440             final Set<String> locations = Sets.newHashSet();
441             for (final File file : files) {
442                 locations.add(file.getAbsolutePath());
443                 if (recursive && file.isDirectory()) {
444                     for (final File child : Files.fileTreeTraverser().preOrderTraversal(file)) {
445                         if (Rio.getParserFormatForFileName(file.getAbsolutePath()) != null) {
446                             locations.add(child.getAbsolutePath());
447                         }
448                     }
449                 }
450             }
451 
452             // Build the indexer
453             final RDFHandler indexer = indexer(output, component);
454 
455             // Run the indexer
456             RDFProcessors.read(true, true, null, null,null,true,
457                     locations.toArray(new String[locations.size()])).apply(RDFSources.NIL,
458                     indexer, 1);
459 
460         } catch (final Throwable ex) {
461             // Display error information and terminate
462             CommandLine.fail(ex);
463         }
464     }
465 
466 }