From 6aefce6acad7b69226f4e2b4210e35f5bd5c87f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=AF=E6=96=87=E5=87=AF?= Date: Tue, 12 Nov 2019 17:14:15 +0800 Subject: [PATCH] =?UTF-8?q?kill=20=E8=BF=9B=E7=A8=8B=E6=97=B6=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E4=B8=B4=E6=97=B6=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/IDataxJobServiceImpl.java | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java index a9e87ea2..47b25630 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.wugui.dataxweb.dto.RunJobDto; import com.wugui.dataxweb.entity.JobLog; @@ -21,8 +22,12 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; -import java.io.*; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; import java.util.List; import java.util.concurrent.*; @@ -40,6 +45,9 @@ public class IDataxJobServiceImpl implements IDataxJobService { new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); private String logFilePath; + + private final static ConcurrentMap jobTmpFiles = Maps.newConcurrentMap(); + /** * 日志文件保存目录 */ @@ -62,9 +70,12 @@ public class IDataxJobServiceImpl implements IDataxJobService { } try { Process p = Runtime.getRuntime().exec(new String[]{"python", getDataXPyPath(), tmpFilePath}); - EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobConfigId + ",Datax运行进程Id: " + ProcessUtil.getProcessId(p)); - ExecDataXOutputThread output=new ExecDataXOutputThread(p.getInputStream(),logFilePath,tmpFilePath); - ExecDataXOutputThread error =new ExecDataXOutputThread(p.getErrorStream(),logFilePath,tmpFilePath); + String processId = ProcessUtil.getProcessId(p); + log.info("Job:{},Datax运行进程Id:{}", jobConfigId, processId); + EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobConfigId + ",Datax运行进程Id: " + processId); + jobTmpFiles.put(processId, tmpFilePath); + ExecDataXOutputThread output = new ExecDataXOutputThread(p.getInputStream(), logFilePath, tmpFilePath); + ExecDataXOutputThread error = new ExecDataXOutputThread(p.getErrorStream(), logFilePath, tmpFilePath); output.start(); error.start(); } catch (Exception e) { @@ -122,9 +133,15 @@ public class IDataxJobServiceImpl implements IDataxJobService { @Override public Boolean killJob(String pid) { + boolean result = ProcessUtil.killProcessByPid(pid); // 删除临时文件 - //FileUtil.del(new File(tmpFilePath)); - return ProcessUtil.killProcessByPid(pid); + if (!CollectionUtils.isEmpty(jobTmpFiles)) { + String pathname = jobTmpFiles.get(pid); + if (pathname != null) { + FileUtil.del(new File(pathname)); + } + } + return result; } }