diff --git a/common/src/main/java/com/alibaba/datax/common/log/EtlJobLogger.java b/common/src/main/java/com/alibaba/datax/common/log/EtlJobLogger.java index 4e709ca4..4d42b907 100644 --- a/common/src/main/java/com/alibaba/datax/common/log/EtlJobLogger.java +++ b/common/src/main/java/com/alibaba/datax/common/log/EtlJobLogger.java @@ -6,8 +6,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.helpers.FormattingTuple; import org.slf4j.helpers.MessageFormatter; -import java.io.PrintWriter; -import java.io.StringWriter; +import java.io.*; import java.util.Date; /** @@ -85,4 +84,93 @@ public class EtlJobLogger { logDetail(callInfo, appendLog); } + /** + * support read log-file + * + * @param logFileName + * @return log content + */ + public static LogResult readLog(String logFileName, int fromLineNum) { + + // valid log file + if (logFileName == null || logFileName.trim().length() == 0) { + return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true); + } + File logFile = new File(logFileName); + + if (!logFile.exists()) { + return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true); + } + + // read file + StringBuffer logContentBuffer = new StringBuffer(); + int toLineNum = 0; + LineNumberReader reader = null; + try { + //reader = new LineNumberReader(new FileReader(logFile)); + reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8")); + String line = null; + + while ((line = reader.readLine()) != null) { + toLineNum = reader.getLineNumber(); // [from, to], start as 1 + if (toLineNum >= fromLineNum) { + logContentBuffer.append(line).append("\n"); + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + // result + LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false); + return logResult; + + /* + // it will return the number of characters actually skipped + reader.skip(Long.MAX_VALUE); + int maxLineNum = reader.getLineNumber(); + maxLineNum++; // 最大行号 + */ + } + + /** + * read log data + * + * @param logFile + * @return log line content + */ + public static String readLines(File logFile) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(logFile), "utf-8")); + if (reader != null) { + StringBuilder sb = new StringBuilder(); + String line = null; + while ((line = reader.readLine()) != null) { + sb.append(line).append("\n"); + } + return sb.toString(); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + return null; + } + } diff --git a/common/src/main/java/com/alibaba/datax/common/log/LogResult.java b/common/src/main/java/com/alibaba/datax/common/log/LogResult.java new file mode 100644 index 00000000..998e510c --- /dev/null +++ b/common/src/main/java/com/alibaba/datax/common/log/LogResult.java @@ -0,0 +1,54 @@ +package com.alibaba.datax.common.log; + +import java.io.Serializable; + +/** + * Created by xuxueli on 17/3/23. + */ +public class LogResult implements Serializable { + private static final long serialVersionUID = 42L; + + public LogResult(int fromLineNum, int toLineNum, String logContent, boolean isEnd) { + this.fromLineNum = fromLineNum; + this.toLineNum = toLineNum; + this.logContent = logContent; + this.isEnd = isEnd; + } + + private int fromLineNum; + private int toLineNum; + private String logContent; + private boolean isEnd; + + public int getFromLineNum() { + return fromLineNum; + } + + public void setFromLineNum(int fromLineNum) { + this.fromLineNum = fromLineNum; + } + + public int getToLineNum() { + return toLineNum; + } + + public void setToLineNum(int toLineNum) { + this.toLineNum = toLineNum; + } + + public String getLogContent() { + return logContent; + } + + public void setLogContent(String logContent) { + this.logContent = logContent; + } + + public boolean isEnd() { + return isEnd; + } + + public void setEnd(boolean end) { + isEnd = end; + } +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java index 7ad0f073..df9b5b8b 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java @@ -1,5 +1,6 @@ package com.wugui.dataxweb.controller; +import com.alibaba.datax.common.log.LogResult; import com.baomidou.mybatisplus.extension.api.R; import com.wugui.dataxweb.dto.RunJobDto; import com.wugui.dataxweb.service.IDataxJobService; @@ -126,6 +127,7 @@ public class JobController { /** * 通过接口传入json配置启动一个datax作业 + * * @param jobJson * @return */ @@ -149,4 +151,15 @@ public class JobController { return R.ok(result); } + /** + * 根据任务id查询日志 + * + * @param id + * @return + */ + @ApiOperation("查看任务抽取日志,id为任务id,fromLineNum为读取的行数") + @GetMapping("/viewJobLog") + public R viewJogLog(Long id, int fromLineNum) { + return R.ok(iDataxJobService.viewJogLog(id, fromLineNum)); + } } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java index e4d61900..d1517e3b 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java @@ -1,5 +1,6 @@ package com.wugui.dataxweb.service; +import com.alibaba.datax.common.log.LogResult; import com.wugui.dataxweb.dto.RunJobDto; /** @@ -8,14 +9,16 @@ import com.wugui.dataxweb.dto.RunJobDto; * @create: 2019-06-17 11:25 **/ public interface IDataxJobService { - /** - * 根据json字符串用线程池启动一个datax作业 - * - * @author: huzekang - * @Date: 2019-06-17 + /** + * 根据json字符串用线程池启动一个datax作业 + * * @param jobJson - */ - String startJobByJsonStr(String jobJson); + * @author: huzekang + * @Date: 2019-06-17 + */ + String startJobByJsonStr(String jobJson); String startJobLog(RunJobDto runJobDto); + + LogResult viewJogLog(Long id, int fromLineNum); } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java index 736fd3e7..5ddeb3ef 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java @@ -3,6 +3,8 @@ package com.wugui.dataxweb.service.impl; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; +import com.alibaba.datax.common.log.LogResult; import com.alibaba.datax.common.spi.ErrorCode; import com.alibaba.datax.core.Engine; import com.alibaba.datax.core.util.ExceptionTracker; @@ -10,6 +12,7 @@ import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.wugui.dataxweb.dto.RunJobDto; import com.wugui.dataxweb.entity.JobLog; @@ -24,6 +27,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; +import java.util.List; import java.util.concurrent.*; /** @@ -46,7 +50,7 @@ public class IDataxJobServiceImpl implements IDataxJobService { private String etlLogDir; @Autowired - private IJobLogService iJobLogService; + private IJobLogService jobLogService; @Override @@ -103,11 +107,26 @@ public class IDataxJobServiceImpl implements IDataxJobService { JobLog jobLog = new JobLog(); jobLog.setJobId(runJobDto.getJobConfigId()); jobLog.setLogFilePath(logFilePath); - iJobLogService.save(jobLog); + jobLogService.save(jobLog); //将路径放进去 json.put(CoreConstant.LOG_FILE_PATH, logFilePath); //启动任务 return startJobByJsonStr(JSON.toJSONString(json)); } + + @Override + public LogResult viewJogLog(Long id, int fromLineNum) { + QueryWrapper queryWrapper = new QueryWrapper<>(); + //根据id获取最新的日志文件路径 + queryWrapper.lambda().eq(JobLog::getJobId, id).orderByDesc(JobLog::getCreateDate); + List list = jobLogService.list(queryWrapper); + //取最新的一条记录 + if (list.isEmpty()) { + return new LogResult(1, 1, "没有找到对应的日志文件!", true); + } else { + //取出路径,读取文件 + return EtlJobLogger.readLog(list.get(0).getLogFilePath(), fromLineNum); + } + } }