kill 进程时删除临时文件

This commit is contained in:
景文凯 2019-11-12 17:14:15 +08:00
parent a24be3da86
commit 6aefce6aca

View File

@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.wugui.dataxweb.dto.RunJobDto;
import com.wugui.dataxweb.entity.JobLog;
@ -21,8 +22,12 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.*;
@ -40,6 +45,9 @@ public class IDataxJobServiceImpl implements IDataxJobService {
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
private String logFilePath;
private final static ConcurrentMap<String, String> jobTmpFiles = Maps.newConcurrentMap();
/**
* 日志文件保存目录
*/
@ -62,9 +70,12 @@ public class IDataxJobServiceImpl implements IDataxJobService {
}
try {
Process p = Runtime.getRuntime().exec(new String[]{"python", getDataXPyPath(), tmpFilePath});
EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobConfigId + ",Datax运行进程Id: " + ProcessUtil.getProcessId(p));
ExecDataXOutputThread output=new ExecDataXOutputThread(p.getInputStream(),logFilePath,tmpFilePath);
ExecDataXOutputThread error =new ExecDataXOutputThread(p.getErrorStream(),logFilePath,tmpFilePath);
String processId = ProcessUtil.getProcessId(p);
log.info("Job:{},Datax运行进程Id:{}", jobConfigId, processId);
EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobConfigId + ",Datax运行进程Id: " + processId);
jobTmpFiles.put(processId, tmpFilePath);
ExecDataXOutputThread output = new ExecDataXOutputThread(p.getInputStream(), logFilePath, tmpFilePath);
ExecDataXOutputThread error = new ExecDataXOutputThread(p.getErrorStream(), logFilePath, tmpFilePath);
output.start();
error.start();
} catch (Exception e) {
@ -122,9 +133,15 @@ public class IDataxJobServiceImpl implements IDataxJobService {
@Override
public Boolean killJob(String pid) {
boolean result = ProcessUtil.killProcessByPid(pid);
// 删除临时文件
//FileUtil.del(new File(tmpFilePath));
return ProcessUtil.killProcessByPid(pid);
if (!CollectionUtils.isEmpty(jobTmpFiles)) {
String pathname = jobTmpFiles.get(pid);
if (pathname != null) {
FileUtil.del(new File(pathname));
}
}
return result;
}
}