1   package eu.fbk.dkm.pikes.tintop;
2   
3   import com.google.common.io.Files;
4   import com.google.common.util.concurrent.ThreadFactoryBuilder;
5   import eu.fbk.dkm.pikes.tintopclient.TintopSession;
6   import eu.fbk.utils.core.CommandLine;
7   import eu.fbk.utils.core.FrequencyHashSet;
8   import eu.fbk.rdfpro.util.IO;
9   import ixa.kaflib.KAFDocument;
10  import org.apache.commons.io.FileUtils;
11  import org.apache.commons.io.IOUtils;
12  import org.jdom2.JDOMException;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  
16  import java.io.*;
17  import java.util.HashSet;
18  import java.util.Iterator;
19  import java.util.List;
20  import java.util.Properties;
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.Executors;
23  import java.util.concurrent.ThreadFactory;
24  import java.util.concurrent.TimeUnit;
25  
26  /**
27   * Created by alessio on 19/01/16.
28   */
29  
30  public class FolderOrchestrator {
31  
32      private static final Logger logger = LoggerFactory.getLogger(FolderOrchestrator.class);
33      static final private int DEFAULT_MAX_ERR_ON_FILE = 5;
34      static final private int DEFAULT_MAX_SIZE = 50000;
35      static final private int DEFAULT_SIZE = 10;
36      static final private int DEFAULT_SLEEPING_TIME = 60000;
37  
38      private int maxErrOnFile = DEFAULT_MAX_ERR_ON_FILE;
39      private int maxSize = DEFAULT_MAX_SIZE;
40      private FrequencyHashSet<String> fileOnError = new FrequencyHashSet<>();
41      private String fileCache = null;
42      private int skipped = 0;
43      public static String[] DEFAULT_EXTENSIONS = new String[] { "xml", "naf" };
44  
45      public FolderOrchestrator() {
46      }
47  
48      public int getMaxErrOnFile() {
49          return maxErrOnFile;
50      }
51  
52      public void setMaxErrOnFile(int maxErrOnFile) {
53          this.maxErrOnFile = maxErrOnFile;
54      }
55  
56      public int getMaxSize() {
57          return maxSize;
58      }
59  
60      public void setMaxSize(int maxSize) {
61          this.maxSize = maxSize;
62      }
63  
64      synchronized public void markFileAsNotDone(String filename) {
65          fileOnError.add(filename);
66          if (fileOnError.get(filename) <= maxErrOnFile) {
67              fileCache = filename;
68  
69          } else {
70              logger.warn(String.format("File %s skipped, more than %d errors", filename, DEFAULT_MAX_ERR_ON_FILE));
71          }
72      }
73  
74      public class LocalTintopClient implements Runnable {
75  
76          TintopSession session;
77          AnnotationPipeline pipeline;
78  
79          public LocalTintopClient(TintopSession session, AnnotationPipeline pipeline) {
80              this.session = session;
81              this.pipeline = pipeline;
82          }
83  
84          @Override
85          public void run() {
86              while (true) {
87                  String filename = null;
88                  try {
89                      filename = getNextFile(session);
90                      if (filename == null) {
91                          break;
92                      }
93  
94                      File file = new File(filename);
95                      if (!file.exists()) {
96                          break;
97                      }
98  
99                      File outputFile = getOutputFile(file, session);
100 
101                     // todo: use parameters
102                     outputFile = new File(outputFile.getAbsolutePath() + ".gz");
103 
104                     logger.debug("Output file: " + outputFile);
105 
106                     logger.info("Loading file: " + filename);
107                     BufferedReader reader = new BufferedReader(new FileReader(filename));
108                     String whole = IOUtils.toString(reader);
109                     reader.close();
110 
111                     KAFDocument doc;
112 
113                     String naf;
114                     try {
115                         doc = pipeline.parseFromString(whole);
116                         naf = doc.toString();
117                     } catch (Throwable e) {
118                         e.printStackTrace();
119                         throw new Exception(e);
120                     }
121 
122                     logger.debug(naf);
123                     if (naf != null) {
124                         logger.info("Writing file " + outputFile);
125                         Files.createParentDirs(outputFile);
126                         try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
127                             w.write(naf);
128                         }
129                     }
130 
131                 } catch (final Throwable ex) {
132                     logger.error(filename + " --- " + ex.getMessage());
133                     markFileAsNotDone(filename);
134 
135                     try {
136                         logger.info("Sleeping...");
137                         Thread.sleep(DEFAULT_SLEEPING_TIME);
138                     } catch (Exception e) {
139                         logger.error(e.getMessage());
140                     }
141                 }
142             }
143         }
144 
145     }
146 
147     private File getOutputFile(File inputFile, TintopSession session) {
148         String outputFile = session.getOutput().getAbsolutePath() + inputFile.getAbsolutePath()
149                 .substring(session.getInput().getAbsolutePath().length());
150         return new File(outputFile);
151     }
152 
153     synchronized public String getNextFile(TintopSession session) {
154 
155         fIter:
156         while (fileCache != null || session.getFileIterator().hasNext()) {
157 
158             File file;
159             if (fileCache != null) {
160                 file = new File(fileCache);
161                 fileCache = null;
162             } else {
163                 file = session.getFileIterator().next();
164             }
165 
166             File outputFile = getOutputFile(file, session);
167 
168             // todo: use parameters
169             outputFile = new File(outputFile.getAbsolutePath() + ".gz");
170 
171             logger.debug("Output file: " + outputFile);
172 
173             if (outputFile.exists()) {
174                 logger.debug("Skipping file (it exists): " + file);
175                 continue fIter;
176             }
177 
178             for (String p : session.getSkipPatterns()) {
179                 if (file.toString().contains(p)) {
180                     logger.debug("Skipping file (skip pattern): " + file);
181                     continue fIter;
182                 }
183             }
184 
185             if (maxSize > 0 && file.length() > maxSize) {
186                 logger.debug("Skipping file (too big, " + file.length() + "): " + file);
187                 skipped++;
188                 continue fIter;
189             }
190 
191             // File is empty
192             if (file.length() < 1000) {
193                 try {
194                     KAFDocument document = KAFDocument.createFromFile(file);
195                     if (document.getRawText() == null || document.getRawText().trim().length() == 0) {
196                         logger.info("File is empty: " + file);
197                         logger.info("Writing empty file " + outputFile);
198                         Files.createParentDirs(outputFile);
199                         try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
200                             w.write(document.toString());
201                         }
202                         continue fIter;
203                     }
204                 } catch (IOException e) {
205                     e.printStackTrace();
206                     skipped++;
207                     continue fIter;
208                 } catch (JDOMException e) {
209                     e.printStackTrace();
210                     skipped++;
211                     continue fIter;
212                 }
213             }
214 
215             return file.getAbsolutePath();
216         }
217 
218         return null;
219     }
220 
221     public void run(TintopSession session, AnnotationPipeline pipeline, int size) {
222         logger.info(String.format("Started process with %d server(s)", size));
223 
224         final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-%02d").build();
225         final ExecutorService executor = Executors.newCachedThreadPool(factory);
226         try {
227             for (int i = 0; i < size; i++) {
228                 executor.submit(new LocalTintopClient(session, pipeline));
229             }
230 
231             executor.shutdown();
232             executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
233 
234         } catch (final InterruptedException ex) {
235             // ignore
236 
237         } finally {
238             executor.shutdownNow();
239         }
240 
241     }
242 
243     public static void main(String[] args) {
244         try {
245             final CommandLine cmd = CommandLine
246                     .parser()
247                     .withName("./orchestrator-folder")
248                     .withHeader("Run the Tintop Orchestrator in a particular folder")
249                     .withOption("i", "input", "Input folder", "FOLDER",
250                             CommandLine.Type.DIRECTORY_EXISTING, true, false, true)
251                     .withOption("o", "output", "Output folder", "FOLDER",
252                             CommandLine.Type.DIRECTORY, true, false, true)
253                     .withOption(null, "skip", "Text file with list of file patterns to skip (one per line)", "FILE",
254                             CommandLine.Type.FILE_EXISTING, true, false, false)
255                     .withOption("m", "max-fail",
256                             String.format("Max fails on a single file to skip (default %d)", DEFAULT_MAX_ERR_ON_FILE),
257                             "INT", CommandLine.Type.INTEGER, true, false, false)
258                     .withOption("z", "max-size",
259                             String.format("Max size of a NAF empty file (default %d)", DEFAULT_MAX_SIZE),
260                             "INT", CommandLine.Type.INTEGER, true, false, false)
261                     .withOption("s", "size",
262                             String.format("Number of threads (default %d)", DEFAULT_SIZE),
263                             "INT", CommandLine.Type.INTEGER, true, false, false)
264                     .withOption("c", "config", "Configuration file", "FILE", CommandLine.Type.FILE_EXISTING, true,
265                             false, false)
266                     .withOption(null, "properties", "Additional properties", "PROPS", CommandLine.Type.STRING, true,
267                             true, false)
268                     .withLogger(LoggerFactory.getLogger("eu.fbk")).parse(args);
269 
270             File input = cmd.getOptionValue("input", File.class);
271             File output = cmd.getOptionValue("output", File.class);
272             File skip = cmd.getOptionValue("skip", File.class);
273             File configFile = cmd.getOptionValue("config", File.class);
274 
275             Integer maxFail = cmd.getOptionValue("max-fail", Integer.class, DEFAULT_MAX_ERR_ON_FILE);
276             Integer maxSize = cmd.getOptionValue("max-size", Integer.class, DEFAULT_MAX_SIZE);
277             Integer size = cmd.getOptionValue("size", Integer.class, DEFAULT_SIZE);
278 
279             List<String> addProperties = cmd.getOptionValues("properties", String.class);
280             Properties additionalProps = new Properties();
281             for (String property : addProperties) {
282                 try {
283                     additionalProps.load(new StringReader(property));
284                 } catch (Exception e) {
285                     logger.warn(e.getMessage());
286                 }
287             }
288 
289             HashSet<String> skipPatterns = new HashSet<>();
290             if (skip != null) {
291                 BufferedReader reader = new BufferedReader(new FileReader(skip));
292                 String line;
293                 while ((line = reader.readLine()) != null) {
294                     line = line.trim();
295                     skipPatterns.add(line);
296                 }
297                 reader.close();
298             }
299 
300             if (!input.exists()) {
301                 logger.error("Input folder does not exist");
302                 System.exit(1);
303             }
304 
305             if (!output.exists()) {
306                 if (!output.mkdirs()) {
307                     logger.error("Unable to create output folder");
308                     System.exit(1);
309                 }
310             }
311 
312             AnnotationPipeline pipeline = null;
313             try {
314                 pipeline = new AnnotationPipeline(configFile, additionalProps);
315                 pipeline.loadModels();
316             } catch (Exception e) {
317                 e.printStackTrace();
318                 logger.error(e.getMessage());
319                 System.exit(1);
320             }
321 
322             FolderOrchestrator orchestrator = new FolderOrchestrator();
323             orchestrator.setMaxErrOnFile(maxFail);
324             orchestrator.setMaxSize(maxSize);
325 
326             String[] extensions = DEFAULT_EXTENSIONS;
327             Iterator<File> fileIterator = FileUtils.iterateFiles(input, extensions, true);
328 
329             TintopSession session = new TintopSession(input, output, fileIterator, skipPatterns);
330             orchestrator.run(session, pipeline, size);
331 
332             logger.info("Skipped: {}", orchestrator.skipped);
333 
334 //            String naf = request.getParameter("naf");
335 //            KAFDocument doc;
336 //
337 //            try {
338 //                doc = pipeline.parseFromString(naf);
339 //            } catch (Exception e) {
340 //                e.printStackTrace();
341 //                throw new Exception(e);
342 //            }
343 
344         } catch (Exception e) {
345             CommandLine.fail(e);
346         }
347 
348     }
349 }