1 package eu.fbk.dkm.pikes.tintopclient;
2
3 import com.google.common.io.Files;
4 import com.google.common.util.concurrent.ThreadFactoryBuilder;
5 import eu.fbk.rdfpro.util.IO;
6 import eu.fbk.utils.core.CommandLine;
7 import eu.fbk.utils.core.FrequencyHashSet;
8 import ixa.kaflib.KAFDocument;
9 import org.apache.commons.io.FileUtils;
10 import org.apache.commons.io.IOUtils;
11 import org.jdom2.JDOMException;
12 import org.slf4j.LoggerFactory;
13
14 import java.io.*;
15 import java.util.ArrayList;
16 import java.util.HashSet;
17 import java.util.Iterator;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.TimeUnit;
22
23
24
25
26
27 public class TintopOrchestrator {
28
29 private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TintopOrchestrator.class);
30 static final private int DEFAULT_MAX_ERR_ON_FILE = 5;
31 static final private int DEFAULT_MAX_SIZE = 50000;
32 static final private int DEFAULT_SLEEPING_TIME = 60000;
33
34 private ArrayList<TintopServer> servers;
35 private boolean fake;
36 private String fileCache = null;
37 public static String[] DEFAULT_EXTENSIONS = new String[] { "xml", "naf" };
38
39 private int skipped = 0;
40
41 private FrequencyHashSet<String> fileOnError = new FrequencyHashSet<>();
42
43 private int maxErrOnFile = DEFAULT_MAX_ERR_ON_FILE;
44 private int maxSize = DEFAULT_MAX_SIZE;
45 private int timeout = TintopClient.DEFAULT_TIMEOUT;
46 private int sleepingTime = DEFAULT_SLEEPING_TIME;
47
48 public class RunnableTintopClient extends TintopClient implements Runnable {
49
50 TintopSession session;
51
52 public RunnableTintopClient(TintopServer server, TintopSession session) {
53 this(server, session, false);
54 }
55
56 public RunnableTintopClient(TintopServer server, TintopSession session, boolean fake) {
57 super(server);
58 super.setFake(fake);
59 this.session = session;
60 setTimeout(timeout);
61 }
62
63 @Override
64 public void run() {
65 Thread.currentThread().setName(server.getId() + " - " + server.getShortName());
66
67 while (true) {
68 String filename = null;
69 try {
70 filename = getNextFile(session);
71 if (filename == null) {
72 break;
73 }
74
75 File file = new File(filename);
76 if (!file.exists()) {
77 break;
78 }
79
80 File outputFile = getOutputFile(file, session);
81
82
83 outputFile = new File(outputFile.getAbsolutePath() + ".gz");
84
85 logger.debug("Output file: " + outputFile);
86
87 logger.info("Loading file: " + filename);
88 BufferedReader reader = new BufferedReader(new FileReader(filename));
89 String whole = IOUtils.toString(reader);
90 reader.close();
91
92 String naf = call(whole);
93 logger.debug(naf);
94 if (naf != null) {
95 logger.info("Writing file " + outputFile);
96 Files.createParentDirs(outputFile);
97 try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
98 w.write(naf);
99 }
100 }
101
102 } catch (final Throwable ex) {
103 logger.error(filename + " --- " + ex.getMessage());
104 markFileAsNotDone(filename);
105
106 try {
107 logger.info("Sleeping...");
108 Thread.sleep(sleepingTime);
109 } catch (Exception e) {
110 logger.error(e.getMessage());
111 }
112 }
113 }
114 }
115
116 }
117
118 public TintopOrchestrator(ArrayList<TintopServer> servers, boolean fake) {
119 this.servers = servers;
120 this.fake = fake;
121 }
122
123 public int getMaxErrOnFile() {
124 return maxErrOnFile;
125 }
126
127 public void setMaxErrOnFile(int maxErrOnFile) {
128 this.maxErrOnFile = maxErrOnFile;
129 }
130
131 public int getMaxSize() {
132 return maxSize;
133 }
134
135 public void setMaxSize(int maxSize) {
136 this.maxSize = maxSize;
137 }
138
139 public int getTimeout() {
140 return timeout;
141 }
142
143 public void setTimeout(int timeout) {
144 this.timeout = timeout;
145 }
146
147 public int getSleepingTime() {
148 return sleepingTime;
149 }
150
151 public void setSleepingTime(int sleepingTime) {
152 this.sleepingTime = sleepingTime;
153 }
154
155 private File getOutputFile(File inputFile, TintopSession session) {
156 String outputFile = session.getOutput().getAbsolutePath() + inputFile.getAbsolutePath()
157 .substring(session.getInput().getAbsolutePath().length());
158 return new File(outputFile);
159 }
160
161 synchronized public void markFileAsNotDone(String filename) {
162 fileOnError.add(filename);
163 if (fileOnError.get(filename) <= maxErrOnFile) {
164 fileCache = filename;
165
166 } else {
167 logger.warn(String.format("File %s skipped, more than %d errors", filename, DEFAULT_MAX_ERR_ON_FILE));
168 }
169 }
170
171 synchronized public String getNextFile(TintopSession session) {
172
173 fIter:
174 while (fileCache != null || session.getFileIterator().hasNext()) {
175
176 File file;
177 if (fileCache != null) {
178 file = new File(fileCache);
179 fileCache = null;
180 } else {
181 file = session.getFileIterator().next();
182 }
183
184 File outputFile = getOutputFile(file, session);
185
186
187 outputFile = new File(outputFile.getAbsolutePath() + ".gz");
188
189 logger.debug("Output file: " + outputFile);
190
191 if (outputFile.exists()) {
192 logger.debug("Skipping file (it exists): " + file);
193 continue fIter;
194 }
195
196 for (String p : session.getSkipPatterns()) {
197 if (file.toString().contains(p)) {
198 logger.debug("Skipping file (skip pattern): " + file);
199 continue fIter;
200 }
201 }
202
203 if (maxSize > 0 && file.length() > maxSize) {
204 logger.debug("Skipping file (too big, " + file.length() + "): " + file);
205 skipped++;
206 continue fIter;
207 }
208
209
210 if (file.length() < 1000) {
211 try {
212 KAFDocument document = KAFDocument.createFromFile(file);
213 if (document.getRawText() == null || document.getRawText().trim().length() == 0) {
214 logger.info("File is empty: " + file);
215 logger.info("Writing empty file " + outputFile);
216 Files.createParentDirs(outputFile);
217 try (Writer w = IO.utf8Writer(IO.buffer(IO.write(outputFile.getAbsolutePath())))) {
218 w.write(document.toString());
219 }
220 continue fIter;
221 }
222 } catch (IOException e) {
223 e.printStackTrace();
224 skipped++;
225 continue fIter;
226 } catch (JDOMException e) {
227 e.printStackTrace();
228 skipped++;
229 continue fIter;
230 }
231 }
232
233 return file.getAbsolutePath();
234 }
235
236 return null;
237 }
238
239 public void run(TintopSession session) {
240 logger.info(String.format("Started process with %d server(s)", servers.size()));
241
242 final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-%02d").build();
243 final ExecutorService executor = Executors.newCachedThreadPool(factory);
244 try {
245 for (TintopServer server : servers) {
246 executor.submit(new RunnableTintopClient(server, session, fake));
247 }
248
249 executor.shutdown();
250 executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
251
252 } catch (final InterruptedException ex) {
253
254
255 } finally {
256 executor.shutdownNow();
257 }
258
259 }
260
261 public static void main(String[] args) {
262
263 try {
264 final CommandLine cmd = CommandLine
265 .parser()
266 .withName("./orchestrator")
267 .withHeader("Run the Tintop Orchestrator")
268 .withOption("i", "input", "Input folder", "FOLDER",
269 CommandLine.Type.DIRECTORY_EXISTING, true, false, true)
270 .withOption("o", "output", "Output folder", "FOLDER",
271 CommandLine.Type.DIRECTORY, true, false, true)
272 .withOption("l", "list", "Text file with list of server (one per line)", "FILE",
273 CommandLine.Type.FILE_EXISTING, true, false, true)
274 .withOption("s", "skip", "Text file with list of file patterns to skip (one per line)", "FILE",
275 CommandLine.Type.FILE_EXISTING, true, false, false)
276 .withOption("m", "max-fail",
277 String.format("Max fails on a single file to skip (default %d)", DEFAULT_MAX_ERR_ON_FILE),
278 "INT", CommandLine.Type.INTEGER, true, false, false)
279 .withOption("z", "max-size",
280 String.format("Max size of a NAF empty file (default %d)", DEFAULT_MAX_SIZE),
281 "INT", CommandLine.Type.INTEGER, true, false, false)
282 .withOption("t", "timeout",
283 String.format("Timeout in ms (default %d)", TintopClient.DEFAULT_TIMEOUT),
284 "INT", CommandLine.Type.INTEGER, true, false, false)
285 .withOption(null, "sleeping-time",
286 String.format("Sleeping time for servers in ms (default %d)", DEFAULT_SLEEPING_TIME),
287 "INT", CommandLine.Type.INTEGER, true, false, false)
288 .withOption("F", "fake", "Fake execution")
289 .withLogger(LoggerFactory.getLogger("eu.fbk")).parse(args);
290
291 File input = cmd.getOptionValue("input", File.class);
292 File output = cmd.getOptionValue("output", File.class);
293 File serverList = cmd.getOptionValue("list", File.class);
294 File skip = cmd.getOptionValue("skip", File.class);
295
296 Integer maxFail = cmd.getOptionValue("max-fail", Integer.class, DEFAULT_MAX_ERR_ON_FILE);
297 Integer maxSize = cmd.getOptionValue("max-size", Integer.class, DEFAULT_MAX_SIZE);
298 Integer timeout = cmd.getOptionValue("timeout", Integer.class, TintopClient.DEFAULT_TIMEOUT);
299 Integer sleepingTime = cmd.getOptionValue("sleeping-time", Integer.class, DEFAULT_SLEEPING_TIME);
300
301 boolean fake = cmd.hasOption("fake");
302
303 HashSet<String> skipPatterns = new HashSet<>();
304 if (skip != null) {
305 BufferedReader reader = new BufferedReader(new FileReader(skip));
306 String line;
307 while ((line = reader.readLine()) != null) {
308 line = line.trim();
309 skipPatterns.add(line);
310 }
311 reader.close();
312 }
313
314 if (!input.exists()) {
315 logger.error("Input folder does not exist");
316 System.exit(1);
317 }
318
319 if (!output.exists()) {
320 if (!output.mkdirs()) {
321 logger.error("Unable to create output folder");
322 System.exit(1);
323 }
324 }
325
326 ArrayList<TintopServer> tintopServers = new ArrayList<>();
327 BufferedReader reader = new BufferedReader(new FileReader(serverList));
328 String line;
329 while ((line = reader.readLine()) != null) {
330 line = line.trim();
331 if (line.length() == 0 || line.startsWith("#")) {
332 continue;
333 }
334 try {
335 TintopServer c = new TintopServer(line.trim());
336 tintopServers.add(c);
337 } catch (Exception e) {
338
339 }
340 }
341
342 String[] extensions = DEFAULT_EXTENSIONS;
343 Iterator<File> fileIterator = FileUtils.iterateFiles(input, extensions, true);
344
345 TintopOrchestrator orchestrator = new TintopOrchestrator(tintopServers, fake);
346 orchestrator.setMaxErrOnFile(maxFail);
347 orchestrator.setMaxSize(maxSize);
348 orchestrator.setTimeout(timeout);
349 orchestrator.setSleepingTime(sleepingTime);
350
351 TintopSession session = new TintopSession(input, output, fileIterator, skipPatterns);
352 orchestrator.run(session);
353
354 logger.info("Skipped: {}", orchestrator.skipped);
355
356 } catch (Exception e) {
357 CommandLine.fail(e);
358 }
359 }
360 }