1 package eu.fbk.dkm.pikes.tintop;
2
3 import com.google.common.io.Files;
4 import com.google.common.util.concurrent.ThreadFactoryBuilder;
5 import eu.fbk.dkm.pikes.tintopclient.TintopSession;
6 import eu.fbk.utils.core.CommandLine;
7 import eu.fbk.utils.core.FrequencyHashSet;
8 import eu.fbk.rdfpro.util.IO;
9 import ixa.kaflib.KAFDocument;
10 import org.apache.commons.io.FileUtils;
11 import org.apache.commons.io.IOUtils;
12 import org.jdom2.JDOMException;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 import java.io.*;
17 import java.util.HashSet;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Properties;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.TimeUnit;
25
26
27
28
29
30 public class FolderOrchestrator {
31
32 private static final Logger logger = LoggerFactory.getLogger(FolderOrchestrator.class);
33 static final private int DEFAULT_MAX_ERR_ON_FILE = 5;
34 static final private int DEFAULT_MAX_SIZE = 50000;
35 static final private int DEFAULT_SIZE = 10;
36 static final private int DEFAULT_SLEEPING_TIME = 60000;
37
38 private int maxErrOnFile = DEFAULT_MAX_ERR_ON_FILE;
39 private int maxSize = DEFAULT_MAX_SIZE;
40 private FrequencyHashSet<String> fileOnError = new FrequencyHashSet<>();
41 private String fileCache = null;
42 private int skipped = 0;
43 public static String[] DEFAULT_EXTENSIONS = new String[] { "xml", "naf" };
44
45 public FolderOrchestrator() {
46 }
47
48 public int getMaxErrOnFile() {
49 return maxErrOnFile;
50 }
51
52 public void setMaxErrOnFile(int maxErrOnFile) {
53 this.maxErrOnFile = maxErrOnFile;
54 }
55
56 public int getMaxSize() {
57 return maxSize;
58 }
59
60 public void setMaxSize(int maxSize) {
61 this.maxSize = maxSize;
62 }
63
64 synchronized public void markFileAsNotDone(String filename) {
65 fileOnError.add(filename);
66 if (fileOnError.get(filename) <= maxErrOnFile) {
67 fileCache = filename;
68
69 } else {
70 logger.warn(String.format("File %s skipped, more than %d errors", filename, DEFAULT_MAX_ERR_ON_FILE));
71 }
72 }
73
74 public class LocalTintopClient implements Runnable {
75
76 TintopSession session;
77 AnnotationPipeline pipeline;
78
79 public LocalTintopClient(TintopSession session, AnnotationPipeline pipeline) {
80 this.session = session;
81 this.pipeline = pipeline;
82 }
83
84 @Override
85 public void run() {
86 while (true) {
87 String filename = null;
88 try {
89 filename = getNextFile(session);
90 if (filename == null) {
91 break;
92 }
93
94 File file = new File(filename);
95 if (!file.exists()) {
96 break;
97 }
98
99 File outputFile = getOutputFile(file, session);
100
101
102 outputFile = new File(outputFile.getAbsolutePath() + ".gz");
103
104 logger.debug("Output file: " + outputFile);
105
106 logger.info("Loading file: " + filename);
107 BufferedReader reader = new BufferedReader(new FileReader(filename));
108 String whole = IOUtils.toString(reader);
109 reader.close();
110
111 KAFDocument doc;
112
113 String naf;
114 try {
115 doc = pipeline.parseFromString(whole);
116 naf = doc.toString();
117 } catch (Throwable e) {
118 e.printStackTrace();
119 throw new Exception(e);
120 }
121
122 logger.debug(naf);
123 if (naf != null) {
124 logger.info("Writing file " + outputFile);
125 Files.createParentDirs(outputFile);
126 try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
127 w.write(naf);
128 }
129 }
130
131 } catch (final Throwable ex) {
132 logger.error(filename + " --- " + ex.getMessage());
133 markFileAsNotDone(filename);
134
135 try {
136 logger.info("Sleeping...");
137 Thread.sleep(DEFAULT_SLEEPING_TIME);
138 } catch (Exception e) {
139 logger.error(e.getMessage());
140 }
141 }
142 }
143 }
144
145 }
146
147 private File getOutputFile(File inputFile, TintopSession session) {
148 String outputFile = session.getOutput().getAbsolutePath() + inputFile.getAbsolutePath()
149 .substring(session.getInput().getAbsolutePath().length());
150 return new File(outputFile);
151 }
152
153 synchronized public String getNextFile(TintopSession session) {
154
155 fIter:
156 while (fileCache != null || session.getFileIterator().hasNext()) {
157
158 File file;
159 if (fileCache != null) {
160 file = new File(fileCache);
161 fileCache = null;
162 } else {
163 file = session.getFileIterator().next();
164 }
165
166 File outputFile = getOutputFile(file, session);
167
168
169 outputFile = new File(outputFile.getAbsolutePath() + ".gz");
170
171 logger.debug("Output file: " + outputFile);
172
173 if (outputFile.exists()) {
174 logger.debug("Skipping file (it exists): " + file);
175 continue fIter;
176 }
177
178 for (String p : session.getSkipPatterns()) {
179 if (file.toString().contains(p)) {
180 logger.debug("Skipping file (skip pattern): " + file);
181 continue fIter;
182 }
183 }
184
185 if (maxSize > 0 && file.length() > maxSize) {
186 logger.debug("Skipping file (too big, " + file.length() + "): " + file);
187 skipped++;
188 continue fIter;
189 }
190
191
192 if (file.length() < 1000) {
193 try {
194 KAFDocument document = KAFDocument.createFromFile(file);
195 if (document.getRawText() == null || document.getRawText().trim().length() == 0) {
196 logger.info("File is empty: " + file);
197 logger.info("Writing empty file " + outputFile);
198 Files.createParentDirs(outputFile);
199 try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
200 w.write(document.toString());
201 }
202 continue fIter;
203 }
204 } catch (IOException e) {
205 e.printStackTrace();
206 skipped++;
207 continue fIter;
208 } catch (JDOMException e) {
209 e.printStackTrace();
210 skipped++;
211 continue fIter;
212 }
213 }
214
215 return file.getAbsolutePath();
216 }
217
218 return null;
219 }
220
221 public void run(TintopSession session, AnnotationPipeline pipeline, int size) {
222 logger.info(String.format("Started process with %d server(s)", size));
223
224 final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-%02d").build();
225 final ExecutorService executor = Executors.newCachedThreadPool(factory);
226 try {
227 for (int i = 0; i < size; i++) {
228 executor.submit(new LocalTintopClient(session, pipeline));
229 }
230
231 executor.shutdown();
232 executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
233
234 } catch (final InterruptedException ex) {
235
236
237 } finally {
238 executor.shutdownNow();
239 }
240
241 }
242
243 public static void main(String[] args) {
244 try {
245 final CommandLine cmd = CommandLine
246 .parser()
247 .withName("./orchestrator-folder")
248 .withHeader("Run the Tintop Orchestrator in a particular folder")
249 .withOption("i", "input", "Input folder", "FOLDER",
250 CommandLine.Type.DIRECTORY_EXISTING, true, false, true)
251 .withOption("o", "output", "Output folder", "FOLDER",
252 CommandLine.Type.DIRECTORY, true, false, true)
253 .withOption(null, "skip", "Text file with list of file patterns to skip (one per line)", "FILE",
254 CommandLine.Type.FILE_EXISTING, true, false, false)
255 .withOption("m", "max-fail",
256 String.format("Max fails on a single file to skip (default %d)", DEFAULT_MAX_ERR_ON_FILE),
257 "INT", CommandLine.Type.INTEGER, true, false, false)
258 .withOption("z", "max-size",
259 String.format("Max size of a NAF empty file (default %d)", DEFAULT_MAX_SIZE),
260 "INT", CommandLine.Type.INTEGER, true, false, false)
261 .withOption("s", "size",
262 String.format("Number of threads (default %d)", DEFAULT_SIZE),
263 "INT", CommandLine.Type.INTEGER, true, false, false)
264 .withOption("c", "config", "Configuration file", "FILE", CommandLine.Type.FILE_EXISTING, true,
265 false, false)
266 .withOption(null, "properties", "Additional properties", "PROPS", CommandLine.Type.STRING, true,
267 true, false)
268 .withLogger(LoggerFactory.getLogger("eu.fbk")).parse(args);
269
270 File input = cmd.getOptionValue("input", File.class);
271 File output = cmd.getOptionValue("output", File.class);
272 File skip = cmd.getOptionValue("skip", File.class);
273 File configFile = cmd.getOptionValue("config", File.class);
274
275 Integer maxFail = cmd.getOptionValue("max-fail", Integer.class, DEFAULT_MAX_ERR_ON_FILE);
276 Integer maxSize = cmd.getOptionValue("max-size", Integer.class, DEFAULT_MAX_SIZE);
277 Integer size = cmd.getOptionValue("size", Integer.class, DEFAULT_SIZE);
278
279 List<String> addProperties = cmd.getOptionValues("properties", String.class);
280 Properties additionalProps = new Properties();
281 for (String property : addProperties) {
282 try {
283 additionalProps.load(new StringReader(property));
284 } catch (Exception e) {
285 logger.warn(e.getMessage());
286 }
287 }
288
289 HashSet<String> skipPatterns = new HashSet<>();
290 if (skip != null) {
291 BufferedReader reader = new BufferedReader(new FileReader(skip));
292 String line;
293 while ((line = reader.readLine()) != null) {
294 line = line.trim();
295 skipPatterns.add(line);
296 }
297 reader.close();
298 }
299
300 if (!input.exists()) {
301 logger.error("Input folder does not exist");
302 System.exit(1);
303 }
304
305 if (!output.exists()) {
306 if (!output.mkdirs()) {
307 logger.error("Unable to create output folder");
308 System.exit(1);
309 }
310 }
311
312 AnnotationPipeline pipeline = null;
313 try {
314 pipeline = new AnnotationPipeline(configFile, additionalProps);
315 pipeline.loadModels();
316 } catch (Exception e) {
317 e.printStackTrace();
318 logger.error(e.getMessage());
319 System.exit(1);
320 }
321
322 FolderOrchestrator orchestrator = new FolderOrchestrator();
323 orchestrator.setMaxErrOnFile(maxFail);
324 orchestrator.setMaxSize(maxSize);
325
326 String[] extensions = DEFAULT_EXTENSIONS;
327 Iterator<File> fileIterator = FileUtils.iterateFiles(input, extensions, true);
328
329 TintopSession session = new TintopSession(input, output, fileIterator, skipPatterns);
330 orchestrator.run(session, pipeline, size);
331
332 logger.info("Skipped: {}", orchestrator.skipped);
333
334
335
336
337
338
339
340
341
342
343
344 } catch (Exception e) {
345 CommandLine.fail(e);
346 }
347
348 }
349 }