1   package eu.fbk.dkm.pikes.tintopclient;
2   
3   import com.google.common.io.Files;
4   import com.google.common.util.concurrent.ThreadFactoryBuilder;
5   import eu.fbk.rdfpro.util.IO;
6   import eu.fbk.utils.core.CommandLine;
7   import eu.fbk.utils.core.FrequencyHashSet;
8   import ixa.kaflib.KAFDocument;
9   import org.apache.commons.io.FileUtils;
10  import org.apache.commons.io.IOUtils;
11  import org.jdom2.JDOMException;
12  import org.slf4j.LoggerFactory;
13  
14  import java.io.*;
15  import java.util.ArrayList;
16  import java.util.HashSet;
17  import java.util.Iterator;
18  import java.util.concurrent.ExecutorService;
19  import java.util.concurrent.Executors;
20  import java.util.concurrent.ThreadFactory;
21  import java.util.concurrent.TimeUnit;
22  
23  /**
24   * Created by alessio on 25/02/15.
25   */
26  
27  public class TintopOrchestrator {
28  
29      private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TintopOrchestrator.class);
30      static final private int DEFAULT_MAX_ERR_ON_FILE = 5;
31      static final private int DEFAULT_MAX_SIZE = 50000;
32      static final private int DEFAULT_SLEEPING_TIME = 60000;
33  
34      private ArrayList<TintopServer> servers;
35      private boolean fake;
36      private String fileCache = null;
37      public static String[] DEFAULT_EXTENSIONS = new String[] { "xml", "naf" };
38  
39      private int skipped = 0;
40  
41      private FrequencyHashSet<String> fileOnError = new FrequencyHashSet<>();
42  
43      private int maxErrOnFile = DEFAULT_MAX_ERR_ON_FILE;
44      private int maxSize = DEFAULT_MAX_SIZE;
45      private int timeout = TintopClient.DEFAULT_TIMEOUT;
46      private int sleepingTime = DEFAULT_SLEEPING_TIME;
47  
48      public class RunnableTintopClient extends TintopClient implements Runnable {
49  
50          TintopSession session;
51  
52          public RunnableTintopClient(TintopServer server, TintopSession session) {
53              this(server, session, false);
54          }
55  
56          public RunnableTintopClient(TintopServer server, TintopSession session, boolean fake) {
57              super(server);
58              super.setFake(fake);
59              this.session = session;
60              setTimeout(timeout);
61          }
62  
63          @Override
64          public void run() {
65              Thread.currentThread().setName(server.getId() + " - " + server.getShortName());
66  
67              while (true) {
68                  String filename = null;
69                  try {
70                      filename = getNextFile(session);
71                      if (filename == null) {
72                          break;
73                      }
74  
75                      File file = new File(filename);
76                      if (!file.exists()) {
77                          break;
78                      }
79  
80                      File outputFile = getOutputFile(file, session);
81  
82                      // todo: use parameters
83                      outputFile = new File(outputFile.getAbsolutePath() + ".gz");
84  
85                      logger.debug("Output file: " + outputFile);
86  
87                      logger.info("Loading file: " + filename);
88                      BufferedReader reader = new BufferedReader(new FileReader(filename));
89                      String whole = IOUtils.toString(reader);
90                      reader.close();
91  
92                      String naf = call(whole);
93                      logger.debug(naf);
94                      if (naf != null) {
95                          logger.info("Writing file " + outputFile);
96                          Files.createParentDirs(outputFile);
97                          try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
98                              w.write(naf);
99                          }
100                     }
101 
102                 } catch (final Throwable ex) {
103                     logger.error(filename + " --- " + ex.getMessage());
104                     markFileAsNotDone(filename);
105 
106                     try {
107                         logger.info("Sleeping...");
108                         Thread.sleep(sleepingTime);
109                     } catch (Exception e) {
110                         logger.error(e.getMessage());
111                     }
112                 }
113             }
114         }
115 
116     }
117 
118     public TintopOrchestrator(ArrayList<TintopServer> servers, boolean fake) {
119         this.servers = servers;
120         this.fake = fake;
121     }
122 
123     public int getMaxErrOnFile() {
124         return maxErrOnFile;
125     }
126 
127     public void setMaxErrOnFile(int maxErrOnFile) {
128         this.maxErrOnFile = maxErrOnFile;
129     }
130 
131     public int getMaxSize() {
132         return maxSize;
133     }
134 
135     public void setMaxSize(int maxSize) {
136         this.maxSize = maxSize;
137     }
138 
139     public int getTimeout() {
140         return timeout;
141     }
142 
143     public void setTimeout(int timeout) {
144         this.timeout = timeout;
145     }
146 
147     public int getSleepingTime() {
148         return sleepingTime;
149     }
150 
151     public void setSleepingTime(int sleepingTime) {
152         this.sleepingTime = sleepingTime;
153     }
154 
155     private File getOutputFile(File inputFile, TintopSession session) {
156         String outputFile = session.getOutput().getAbsolutePath() + inputFile.getAbsolutePath()
157                 .substring(session.getInput().getAbsolutePath().length());
158         return new File(outputFile);
159     }
160 
161     synchronized public void markFileAsNotDone(String filename) {
162         fileOnError.add(filename);
163         if (fileOnError.get(filename) <= maxErrOnFile) {
164             fileCache = filename;
165 
166         } else {
167             logger.warn(String.format("File %s skipped, more than %d errors", filename, DEFAULT_MAX_ERR_ON_FILE));
168         }
169     }
170 
171     synchronized public String getNextFile(TintopSession session) {
172 
173         fIter:
174         while (fileCache != null || session.getFileIterator().hasNext()) {
175 
176             File file;
177             if (fileCache != null) {
178                 file = new File(fileCache);
179                 fileCache = null;
180             } else {
181                 file = session.getFileIterator().next();
182             }
183 
184             File outputFile = getOutputFile(file, session);
185 
186             // todo: use parameters
187             outputFile = new File(outputFile.getAbsolutePath() + ".gz");
188 
189             logger.debug("Output file: " + outputFile);
190 
191             if (outputFile.exists()) {
192                 logger.debug("Skipping file (it exists): " + file);
193                 continue fIter;
194             }
195 
196             for (String p : session.getSkipPatterns()) {
197                 if (file.toString().contains(p)) {
198                     logger.debug("Skipping file (skip pattern): " + file);
199                     continue fIter;
200                 }
201             }
202 
203             if (maxSize > 0 && file.length() > maxSize) {
204                 logger.debug("Skipping file (too big, " + file.length() + "): " + file);
205                 skipped++;
206                 continue fIter;
207             }
208 
209             // File is empty
210             if (file.length() < 1000) {
211                 try {
212                     KAFDocument document = KAFDocument.createFromFile(file);
213                     if (document.getRawText() == null || document.getRawText().trim().length() == 0) {
214                         logger.info("File is empty: " + file);
215                         logger.info("Writing empty file " + outputFile);
216                         Files.createParentDirs(outputFile);
217                         try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
218                             w.write(document.toString());
219                         }
220                         continue fIter;
221                     }
222                 } catch (IOException e) {
223                     e.printStackTrace();
224                     skipped++;
225                     continue fIter;
226                 } catch (JDOMException e) {
227                     e.printStackTrace();
228                     skipped++;
229                     continue fIter;
230                 }
231             }
232 
233             return file.getAbsolutePath();
234         }
235 
236         return null;
237     }
238 
239     public void run(TintopSession session) {
240         logger.info(String.format("Started process with %d server(s)", servers.size()));
241 
242         final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-%02d").build();
243         final ExecutorService executor = Executors.newCachedThreadPool(factory);
244         try {
245             for (TintopServer server : servers) {
246                 executor.submit(new RunnableTintopClient(server, session, fake));
247             }
248 
249             executor.shutdown();
250             executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
251 
252         } catch (final InterruptedException ex) {
253             // ignore
254 
255         } finally {
256             executor.shutdownNow();
257         }
258 
259     }
260 
261     public static void main(String[] args) {
262 
263         try {
264             final CommandLine cmd = CommandLine
265                     .parser()
266                     .withName("./orchestrator")
267                     .withHeader("Run the Tintop Orchestrator")
268                     .withOption("i", "input", "Input folder", "FOLDER",
269                             CommandLine.Type.DIRECTORY_EXISTING, true, false, true)
270                     .withOption("o", "output", "Output folder", "FOLDER",
271                             CommandLine.Type.DIRECTORY, true, false, true)
272                     .withOption("l", "list", "Text file with list of server (one per line)", "FILE",
273                             CommandLine.Type.FILE_EXISTING, true, false, true)
274                     .withOption("s", "skip", "Text file with list of file patterns to skip (one per line)", "FILE",
275                             CommandLine.Type.FILE_EXISTING, true, false, false)
276                     .withOption("m", "max-fail",
277                             String.format("Max fails on a single file to skip (default %d)", DEFAULT_MAX_ERR_ON_FILE),
278                             "INT", CommandLine.Type.INTEGER, true, false, false)
279                     .withOption("z", "max-size",
280                             String.format("Max size of a NAF empty file (default %d)", DEFAULT_MAX_SIZE),
281                             "INT", CommandLine.Type.INTEGER, true, false, false)
282                     .withOption("t", "timeout",
283                             String.format("Timeout in ms (default %d)", TintopClient.DEFAULT_TIMEOUT),
284                             "INT", CommandLine.Type.INTEGER, true, false, false)
285                     .withOption(null, "sleeping-time",
286                             String.format("Sleeping time for servers in ms (default %d)", DEFAULT_SLEEPING_TIME),
287                             "INT", CommandLine.Type.INTEGER, true, false, false)
288                     .withOption("F", "fake", "Fake execution")
289                     .withLogger(LoggerFactory.getLogger("eu.fbk")).parse(args);
290 
291             File input = cmd.getOptionValue("input", File.class);
292             File output = cmd.getOptionValue("output", File.class);
293             File serverList = cmd.getOptionValue("list", File.class);
294             File skip = cmd.getOptionValue("skip", File.class);
295 
296             Integer maxFail = cmd.getOptionValue("max-fail", Integer.class, DEFAULT_MAX_ERR_ON_FILE);
297             Integer maxSize = cmd.getOptionValue("max-size", Integer.class, DEFAULT_MAX_SIZE);
298             Integer timeout = cmd.getOptionValue("timeout", Integer.class, TintopClient.DEFAULT_TIMEOUT);
299             Integer sleepingTime = cmd.getOptionValue("sleeping-time", Integer.class, DEFAULT_SLEEPING_TIME);
300 
301             boolean fake = cmd.hasOption("fake");
302 
303             HashSet<String> skipPatterns = new HashSet<>();
304             if (skip != null) {
305                 BufferedReader reader = new BufferedReader(new FileReader(skip));
306                 String line;
307                 while ((line = reader.readLine()) != null) {
308                     line = line.trim();
309                     skipPatterns.add(line);
310                 }
311                 reader.close();
312             }
313 
314             if (!input.exists()) {
315                 logger.error("Input folder does not exist");
316                 System.exit(1);
317             }
318 
319             if (!output.exists()) {
320                 if (!output.mkdirs()) {
321                     logger.error("Unable to create output folder");
322                     System.exit(1);
323                 }
324             }
325 
326             ArrayList<TintopServer> tintopServers = new ArrayList<>();
327             BufferedReader reader = new BufferedReader(new FileReader(serverList));
328             String line;
329             while ((line = reader.readLine()) != null) {
330                 line = line.trim();
331                 if (line.length() == 0 || line.startsWith("#")) {
332                     continue;
333                 }
334                 try {
335                     TintopServer c = new TintopServer(line.trim());
336                     tintopServers.add(c);
337                 } catch (Exception e) {
338                     // ignore
339                 }
340             }
341 
342             String[] extensions = DEFAULT_EXTENSIONS;
343             Iterator<File> fileIterator = FileUtils.iterateFiles(input, extensions, true);
344 
345             TintopOrchestrator orchestrator = new TintopOrchestrator(tintopServers, fake);
346             orchestrator.setMaxErrOnFile(maxFail);
347             orchestrator.setMaxSize(maxSize);
348             orchestrator.setTimeout(timeout);
349             orchestrator.setSleepingTime(sleepingTime);
350 
351             TintopSession session = new TintopSession(input, output, fileIterator, skipPatterns);
352             orchestrator.run(session);
353 
354             logger.info("Skipped: {}", orchestrator.skipped);
355 
356         } catch (Exception e) {
357             CommandLine.fail(e);
358         }
359     }
360 }