1   package eu.fbk.dkm.pikes.naflib;
2   
3   import com.google.common.base.Objects;
4   import com.google.common.base.Strings;
5   import com.google.common.base.Throwables;
6   import com.google.common.collect.ImmutableList;
7   import com.google.common.collect.Iterators;
8   import com.google.common.collect.Lists;
9   import com.google.common.collect.UnmodifiableIterator;
10  import com.google.common.io.ByteStreams;
11  import eu.fbk.utils.svm.Util;
12  import eu.fbk.rdfpro.util.IO;
13  import ixa.kaflib.KAFDocument;
14  import org.slf4j.LoggerFactory;
15  
16  import javax.annotation.Nullable;
17  import java.io.ByteArrayInputStream;
18  import java.io.File;
19  import java.io.InputStream;
20  import java.io.Serializable;
21  import java.nio.file.Files;
22  import java.nio.file.Path;
23  import java.nio.file.Paths;
24  import java.util.*;
25  import java.util.function.BiConsumer;
26  import java.util.function.Consumer;
27  import java.util.stream.Stream;
28  import java.util.stream.StreamSupport;
29  
30  public final class Corpus implements Iterable<KAFDocument>, Serializable {
31  
32      private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Corpus.class);
33  
34      private static final long serialVersionUID = 1L;
35  
36      private static final Corpus EMPTY = new Corpus(new Path[0], null);
37  
38      private final Path[] files;
39  
40      @Nullable
41      private final BiConsumer<Path, KAFDocument> transformer;
42  
43      @Nullable
44      private transient Set<Path> fileSet;
45  
46      @Nullable
47      private transient Path path;
48  
49      public static Corpus create(final boolean recursive, final Object... filesOrDirs) {
50          return create(recursive, Arrays.asList(filesOrDirs));
51      }
52  
53      public static Corpus create(final boolean recursive, final Iterable<?> filesOrDirs) {
54  
55          final List<Path> paths = Lists.newArrayList();
56          for (final Object fileOrDir : filesOrDirs) {
57              if (fileOrDir instanceof Path) {
58                  paths.add((Path) fileOrDir);
59              } else if (fileOrDir instanceof File) {
60                  paths.add(((File) fileOrDir).toPath());
61              } else {
62                  paths.add(Paths.get(fileOrDir.toString()));
63              }
64          }
65  
66          // todo: this uses Util, a class included in utils-svm
67          final List<Path> files = Util.fileMatch(paths, ImmutableList.of(".naf", ".naf.gz",
68                  ".naf.bz2", ".naf.xz", ".xml", ".xml.gz", ".xml.bz2", ".xml.xz"), recursive);
69  
70          for (int i = 0; i < files.size(); ++i) {
71              files.set(i, files.get(i).toAbsolutePath().normalize());
72          }
73  
74          if (files.isEmpty()) {
75              return EMPTY;
76          } else {
77              return new Corpus(files.toArray(new Path[files.size()]), null);
78          }
79      }
80  
81      private Corpus(final Path[] files, @Nullable final BiConsumer<Path, KAFDocument> transformer) {
82          this.files = files;
83          this.transformer = transformer;
84      }
85  
86      public Path path() {
87          if (this.path == null) {
88              String prefix = this.files[0].toString();
89              for (final Path file : this.files) {
90                  prefix = Strings.commonPrefix(prefix, file.toString());
91              }
92              Path path = Paths.get(prefix);
93              if (!Files.exists(path) || !Files.isDirectory(path)) {
94                  path = path.getParent();
95              }
96              this.path = path.toAbsolutePath().normalize();
97          }
98          return this.path;
99      }
100 
101     public int size() {
102         return this.files.length;
103     }
104 
105     public boolean isEmpty() {
106         return this.files.length == 0;
107     }
108 
109     public Path file(final Object key) {
110         try {
111             int index;
112             if (key instanceof Number) {
113                 index = ((Number) key).intValue();
114             } else if (key instanceof File) {
115                 index = Arrays.binarySearch(this.files, ((File) key).toPath());
116             } else if (key instanceof Path) {
117                 index = Arrays.binarySearch(this.files, key);
118             } else {
119                 index = Arrays.binarySearch(this.files, Paths.get(key.toString()));
120             }
121             if (index < 0 || index >= this.files.length) {
122                 throw new IllegalArgumentException("No file in this corpus for " + key);
123             }
124             return this.files[index];
125 
126         } catch (final Throwable ex) {
127             throw Throwables.propagate(ex);
128         }
129     }
130 
131     public KAFDocument get(final Object key) {
132         try {
133             int index;
134             if (key instanceof Number) {
135                 index = ((Number) key).intValue();
136             } else if (key instanceof File) {
137                 index = Arrays.binarySearch(this.files, ((File) key).toPath());
138             } else if (key instanceof Path) {
139                 index = Arrays.binarySearch(this.files, key);
140             } else {
141                 index = Arrays.binarySearch(this.files, Paths.get(key.toString()));
142             }
143             if (index < 0 || index >= this.files.length) {
144                 throw new IllegalArgumentException("No file in this corpus for " + key);
145             }
146             final Path file = this.files[index].toAbsolutePath();
147 
148             KAFDocument document = null;
149             try (InputStream stream = IO.read(file.toString())) {
150                 byte[] bytes;
151                 bytes = ByteStreams.toByteArray(stream);
152                 document = KAFDocument.createFromStream(IO.utf8Reader(new ByteArrayInputStream(
153                         bytes)));
154             } catch (final Throwable ex) {
155                 LOGGER.warn("Failed to parse document " + file, ex);
156                 return null;
157             }
158 
159             final String relativePath = file.toString().substring(path().toString().length());
160             document.getPublic().publicId = relativePath;
161             if ("http://www.example.com".equals(document.getPublic().uri)) {
162                 document.getPublic().uri = "doc:" + relativePath;
163             }
164             if (this.transformer != null) {
165                 this.transformer.accept(file, document);
166             }
167             return document;
168 
169         } catch (final Throwable ex) {
170             throw Throwables.propagate(ex);
171         }
172     }
173 
174     @Override
175     public Iterator<KAFDocument> iterator() {
176         return new UnmodifiableIterator<KAFDocument>() {
177 
178             private int index = 0;
179 
180             @Override
181             public boolean hasNext() {
182                 return this.index < Corpus.this.files.length;
183             }
184 
185             @Override
186             public KAFDocument next() {
187                 return get(this.index++);
188             }
189 
190         };
191     }
192 
193     @Override
194     public Spliterator<KAFDocument> spliterator() {
195         return spliteratorHelper(Arrays.spliterator(this.files));
196     }
197 
198     private Spliterator<KAFDocument> spliteratorHelper(final Spliterator<Path> delegate) {
199         return new Spliterator<KAFDocument>() {
200 
201             @Override
202             public boolean tryAdvance(final Consumer<? super KAFDocument> action) {
203                 return delegate.tryAdvance(file -> {
204                     action.accept(get(file));
205                 });
206             }
207 
208             @Override
209             public Spliterator<KAFDocument> trySplit() {
210                 final Spliterator<Path> splittedDelegate = delegate.trySplit();
211                 return splittedDelegate == null ? null : spliteratorHelper(splittedDelegate);
212             }
213 
214             @Override
215             public long estimateSize() {
216                 return delegate.estimateSize();
217             }
218 
219             @Override
220             public int characteristics() {
221                 return Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL
222                         | Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
223             }
224 
225         };
226     }
227 
228     public Stream<KAFDocument> stream() {
229         return StreamSupport.stream(spliterator(), false);
230     }
231 
232     public Stream<KAFDocument> parallelStream() {
233         return StreamSupport.stream(spliterator(), true);
234     }
235 
236     public Set<Path> files() {
237         if (this.fileSet == null) {
238             this.fileSet = new AbstractSet<Path>() {
239 
240                 @Override
241                 public int size() {
242                     return Corpus.this.files.length;
243                 }
244 
245                 @Override
246                 public boolean contains(final Object object) {
247                     return object instanceof File
248                             && Arrays.binarySearch(Corpus.this.files, object) >= 0;
249                 }
250 
251                 @Override
252                 public Iterator<Path> iterator() {
253                     return Iterators.forArray(Corpus.this.files);
254                 }
255 
256                 @Override
257                 public Spliterator<Path> spliterator() {
258                     return Arrays.spliterator(Corpus.this.files);
259                 }
260 
261             };
262         }
263         return this.fileSet;
264     }
265 
266     public Corpus transform(final Consumer<KAFDocument> transformer) {
267         return transform((final Path file, final KAFDocument document) -> {
268             transformer.accept(document);
269         });
270     }
271 
272     public Corpus transform(final BiConsumer<Path, KAFDocument> transformer) {
273         return new Corpus(this.files, this.transformer == null ? transformer
274                 : this.transformer.andThen(transformer));
275     }
276 
277     public Corpus fixURIs() {
278         return transform((final Path file, final KAFDocument document) -> {
279             final String relativePath = file.toString().substring(path().toString().length());
280             document.getPublic().uri = "doc:" + relativePath;
281             document.getPublic().publicId = relativePath;
282         });
283     }
284 
285     public Corpus[] split(@Nullable final Long shuffleSeed, final float... percentages) {
286 
287         // Shuffle the files if necessary, using the supplied seed
288         Path[] files = this.files;
289         if (shuffleSeed != null) {
290             final List<Path> list = Lists.newArrayList(files);
291             final Random random = new Random(shuffleSeed);
292             Collections.shuffle(list, random);
293             files = list.toArray(new Path[list.size()]);
294         }
295 
296         // Split the (shuffled) file array based on supplied percentages
297         final Corpus[] corpora = new Corpus[percentages.length];
298         int index = 0;
299         float cumulated = 0.0f;
300         for (int i = 0; i < percentages.length; ++i) {
301             cumulated += percentages[i];
302             if (cumulated > 1.0f) {
303                 throw new IllegalArgumentException("Invalid percentages (sum must be 1.0f): "
304                         + Arrays.toString(percentages));
305             }
306             final int endIndex = (int) Math.ceil(files.length * cumulated);
307             final Path[] partition = Arrays.copyOfRange(files, index, endIndex);
308             if (shuffleSeed != null) {
309                 Arrays.sort(partition);
310             }
311             corpora[i] = new Corpus(partition, this.transformer);
312             index = endIndex;
313         }
314         return corpora;
315     }
316 
317     @Override
318     public boolean equals(final Object object) {
319         if (object == this) {
320             return true;
321         }
322         if (!(object instanceof Corpus)) {
323             return false;
324         }
325         final Corpus other = (Corpus) object;
326         return Arrays.equals(this.files, other.files)
327                 && Objects.equal(this.transformer, other.transformer);
328     }
329 
330     @Override
331     public int hashCode() {
332         return Objects.hashCode(Arrays.hashCode(this.files), this.transformer);
333     }
334 
335     @Override
336     public String toString() {
337         if (this.files.length == 0) {
338             return "Empty corpus";
339         } else {
340             return this.files.length + " document(s) corpus (path: " + path() + ")";
341         }
342     }
343 
344 }