新增job运行记录接口

新增job运行状态更新功能
从datax日志中过滤异常并更新job状态
This commit is contained in:
景文凯 2019-11-14 08:59:48 +08:00
parent 4d0634e820
commit a5bb7fdf33
11 changed files with 305 additions and 95 deletions

View File

@ -105,41 +105,38 @@ CREATE TABLE `job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_id` bigint(20) NOT NULL COMMENT '抽取任务主键ID',
`log_file_path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '日志文件路径',
`update_date` datetime(0) NULL DEFAULT NULL,
`status` int(1) NULL DEFAULT 1,
`pid` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '任务进程ID',
`handle_time` datetime(0) NULL DEFAULT NULL COMMENT '执行-时间',
`handle_code` int(11) NULL DEFAULT 0 COMMENT '执行-状态',
`handle_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行-日志',
`create_by` int(11) NULL DEFAULT NULL,
`create_date` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
`update_by` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 26 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '抽取日志记录表' ROW_FORMAT = Dynamic;
) ENGINE = InnoDB AUTO_INCREMENT = 0 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '抽取日志记录表' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of job_log
-- ----------------------------
INSERT INTO `job_log` VALUES (1, 1, '/data/applogs/datax-web/1_1561638145701', NULL, 1, NULL, '2019-06-27 08:22:26', NULL);
INSERT INTO `job_log` VALUES (2, 1, '/data/applogs/datax-web/1_1561638359975.log', NULL, 1, NULL, '2019-06-27 08:26:00', NULL);
INSERT INTO `job_log` VALUES (3, 1, '/data/applogs/datax-web/1_1561638760584.log', NULL, 1, NULL, '2019-06-27 08:32:41', NULL);
INSERT INTO `job_log` VALUES (4, 4, '/data/applogs/datax-web/4_1564480870563.log', '2019-07-30 18:01:11', 1, NULL, '2019-07-30 18:01:11', NULL);
INSERT INTO `job_log` VALUES (5, 3, '/data/applogs/datax-web/3_1564481419699.log', '2019-07-30 18:10:20', 1, NULL, '2019-07-30 18:10:20', NULL);
INSERT INTO `job_log` VALUES (6, 3, '/data/applogs/datax-web/3_1564485764734.log', '2019-07-30 19:22:45', 1, NULL, '2019-07-30 19:22:45', NULL);
INSERT INTO `job_log` VALUES (7, 3, '/data/applogs/datax-web/3_1564485918860.log', '2019-07-30 19:25:19', 1, NULL, '2019-07-30 19:25:19', NULL);
INSERT INTO `job_log` VALUES (8, 3, '/data/applogs/datax-web/3_1564486087223.log', '2019-07-30 19:28:07', 1, NULL, '2019-07-30 19:28:07', NULL);
INSERT INTO `job_log` VALUES (9, 3, '/data/applogs/datax-web/3_1564486152278.log', '2019-07-30 19:29:12', 1, NULL, '2019-07-30 19:29:12', NULL);
INSERT INTO `job_log` VALUES (10, 3, '/data/applogs/datax-web/3_1564486351631.log', '2019-07-30 19:32:32', 1, NULL, '2019-07-30 19:32:32', NULL);
INSERT INTO `job_log` VALUES (11, 3, '/data/applogs/datax-web/3_1564486375214.log', '2019-07-30 19:32:55', 1, NULL, '2019-07-30 19:32:55', NULL);
INSERT INTO `job_log` VALUES (12, 3, '/data/applogs/datax-web/3_1564486398393.log', '2019-07-30 19:33:18', 1, NULL, '2019-07-30 19:33:18', NULL);
INSERT INTO `job_log` VALUES (13, 3, '/data/applogs/datax-web/3_1564488007122.log', '2019-07-30 20:00:07', 1, NULL, '2019-07-30 20:00:07', NULL);
INSERT INTO `job_log` VALUES (14, 3, '/data/applogs/datax-web/3_1564489795800.log', '2019-07-30 20:29:56', 1, NULL, '2019-07-30 20:29:56', NULL);
INSERT INTO `job_log` VALUES (15, 3, '/data/applogs/datax-web/3_1564490723427.log', '2019-07-30 20:45:23', 1, NULL, '2019-07-30 20:45:23', NULL);
INSERT INTO `job_log` VALUES (16, 3, '/data/applogs/datax-web/3_1564490843863.log', '2019-07-30 20:47:24', 1, NULL, '2019-07-30 20:47:24', NULL);
INSERT INTO `job_log` VALUES (17, 3, '/data/applogs/datax-web/3_1564491445033.log', '2019-07-30 20:57:25', 1, NULL, '2019-07-30 20:57:25', NULL);
INSERT INTO `job_log` VALUES (18, 3, '/data/applogs/datax-web/3_1564491868935.log', '2019-07-30 21:04:29', 1, NULL, '2019-07-30 21:04:29', NULL);
INSERT INTO `job_log` VALUES (19, 3, '/data/applogs/datax-web/3_1564492047112.log', '2019-07-30 21:07:27', 1, NULL, '2019-07-30 21:07:27', NULL);
INSERT INTO `job_log` VALUES (20, 3, '/data/applogs/datax-web/3_1564492173290.log', '2019-07-30 21:09:33', 1, NULL, '2019-07-30 21:09:33', NULL);
INSERT INTO `job_log` VALUES (21, 3, '/data/applogs/datax-web/3_1564492308532.log', '2019-07-30 21:11:49', 1, NULL, '2019-07-30 21:11:49', NULL);
INSERT INTO `job_log` VALUES (22, 3, '/data/applogs/datax-web/3_1564492378732.log', '2019-07-30 21:12:59', 1, NULL, '2019-07-30 21:12:59', NULL);
INSERT INTO `job_log` VALUES (23, 3, '/data/applogs/datax-web/3_1564495260737.log', '2019-07-30 22:01:01', 1, NULL, '2019-07-30 22:01:01', NULL);
INSERT INTO `job_log` VALUES (24, 3, '/data/applogs/datax-web/3_1564574180078.log', '2019-07-31 19:56:20', 1, NULL, '2019-07-31 19:56:20', NULL);
INSERT INTO `job_log` VALUES (25, 3, '/data/applogs/datax-web/3_1564574319878.log', '2019-07-31 19:58:40', 1, NULL, '2019-07-31 19:58:40', NULL);
INSERT INTO `job_log` VALUES (12, 4, 'D:\\temp\\logs\\datax-web\\4_1573628574883.log', '88612', '2019-11-13 15:02:55', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (13, 4, 'D:\\temp\\logs\\datax-web\\4_1573628575201.log', '88772', '2019-11-13 15:02:55', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (14, 4, 'D:\\temp\\logs\\datax-web\\4_1573628576097.log', '88984', '2019-11-13 15:02:56', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (15, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589106.log', '89224', '2019-11-13 15:03:09', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (16, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589293.log', '89364', '2019-11-13 15:03:09', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (17, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589509.log', '89520', '2019-11-13 15:03:10', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (18, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589670.log', '89652', '2019-11-13 15:03:10', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (19, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589859.log', '89720', '2019-11-13 15:03:10', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (20, 4, 'D:\\temp\\logs\\datax-web\\4_1573628590201.log', '89832', '2019-11-13 15:03:10', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (21, 4, 'D:\\temp\\logs\\datax-web\\4_1573628590325.log', '89912', '2019-11-13 15:03:10', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (22, 6, 'D:\\temp\\logs\\datax-web\\6_1573630200227.log', '18352', '2019-11-13 15:30:00', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (23, 6, 'D:\\temp\\logs\\datax-web\\6_1573630558990.log', '74672', '2019-11-13 15:35:59', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (24, 6, 'D:\\temp\\logs\\datax-web\\6_1573635226093.log', '93744', '2019-11-13 16:53:46', 0, NULL, NULL);
INSERT INTO `job_log` VALUES (25, 4, 'D:\\temp\\logs\\datax-web\\4_1573635586309.log', '94472', '2019-11-13 16:59:46', 200, NULL, NULL);
INSERT INTO `job_log` VALUES (26, 4, 'D:\\temp\\logs\\datax-web\\4_1573635777426.log', '94460', '2019-11-13 17:02:57', 500, NULL, NULL);
INSERT INTO `job_log` VALUES (27, 3, 'D:\\temp\\logs\\datax-web\\3_1573637050199.log', '97044', '2019-11-13 17:24:10', 200, NULL, NULL);
INSERT INTO `job_log` VALUES (28, 3, 'D:\\temp\\logs\\datax-web\\3_1573637194736.log', '89716', '2019-11-13 17:26:35', 200, NULL, NULL);
INSERT INTO `job_log` VALUES (29, 3, 'D:\\temp\\logs\\datax-web\\3_1573638319908.log', '97816', '2019-11-13 17:45:20', 200, NULL, NULL);
INSERT INTO `job_log` VALUES (30, 3, 'D:\\temp\\logs\\datax-web\\3_1573638524001.log', '97888', '2019-11-13 17:48:44', 200, NULL, NULL);
INSERT INTO `job_log` VALUES (31, 3, 'D:\\temp\\logs\\datax-web\\3_1573638646217.log', '97884', '2019-11-13 17:50:46', 500, NULL, NULL);
INSERT INTO `job_log` VALUES (32, 3, 'D:\\temp\\logs\\datax-web\\3_1573638967908.log', '96844', '2019-11-13 17:56:08', 500, NULL, NULL);
INSERT INTO `job_log` VALUES (33, 3, 'D:\\temp\\logs\\datax-web\\3_1573639056074.log', '92228', '2019-11-13 17:57:36', 500, NULL, NULL);
SET FOREIGN_KEY_CHECKS = 1;

View File

@ -0,0 +1,31 @@
package com.wugui.dataxweb.config;
import com.wugui.dataxweb.service.IJobLogService;
import lombok.Getter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* datax-web config
*
* @author jingwk 2019-11-10
*/
@Component
@Getter
public class DataXWebConfig implements InitializingBean {
private static DataXWebConfig dataXWebConfig = null;
public static DataXWebConfig getDataXWebConfig() {
return dataXWebConfig;
}
@Override
public void afterPropertiesSet() throws Exception {
dataXWebConfig=this;
}
@Autowired
private IJobLogService jobLogService;
}

View File

@ -27,7 +27,7 @@ public class JobController {
public void testStartJob() {
// 指定获取作业配置json的接口此处用下面mock出来的接口提供
String jobPath = "http://localhost:8080/mock_stream2stream";
iDataxJobService.startJobByJsonStr(jobPath,0L);
iDataxJobService.startJob(jobPath,0L);
}
@GetMapping("/mock_oracle2mongodb")
@ -134,7 +134,7 @@ public class JobController {
@ApiOperation("通过传入json配置启动一个datax作业")
@PostMapping("/runJob")
public R<String> runJob(@RequestBody RunJobDto runJobDto) {
String result = iDataxJobService.startJobByJsonStr(runJobDto.getJobJson(),runJobDto.getJobConfigId());
String result = iDataxJobService.startJob(runJobDto.getJobJson(),runJobDto.getJobConfigId());
return R.ok(result);
}
@ -147,14 +147,14 @@ public class JobController {
@ApiOperation("通过传入 runJobDto 实体启动一个datax作业并记录日志")
@PostMapping("/runJobLog")
public R<String> runJobLog(@RequestBody RunJobDto runJobDto) {
String result = iDataxJobService.startJobLog(runJobDto);
String result = iDataxJobService.startJob(runJobDto.getJobJson(),runJobDto.getJobConfigId());
return R.ok(result);
}
@ApiOperation("通过传入 进程ID 停止该job作业")
@GetMapping("/killJob/{pid}")
public R<Boolean> killJob(@PathVariable(value ="pid") String pid){
return R.ok(iDataxJobService.killJob(pid));
@GetMapping("/killJob/{pid}/{id}")
public R<Boolean> killJob(@PathVariable(value ="pid") String pid,@PathVariable(value = "id") Long id){
return R.ok(iDataxJobService.killJob(pid,id));
}
/**

View File

@ -0,0 +1,103 @@
package com.wugui.dataxweb.controller;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.api.ApiController;
import com.baomidou.mybatisplus.extension.api.R;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.log.LogResult;
import com.wugui.dataxweb.service.IJobLogService;
import com.wugui.dataxweb.util.PageUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* ProcessUtil
*
* @author jingwk
* @version 1.0
* @since 2019/11/10
*/
@RestController
@RequestMapping("log")
@Api(tags = "datax日志接口")
@Slf4j
public class JobLogController extends ApiController {
@Autowired
private IJobLogService iJobLogService;
/**
* 根据日志路径查看详情
*
* @param logFilePath
* @param fromLineNum
* @return
*/
@ApiOperation("查看任务抽取日志,logFilePath为日志路径fromLineNum为读取的行数")
@GetMapping("/view")
public R<LogResult> viewJobLog(String logFilePath, int fromLineNum) {
return R.ok(iJobLogService.viewJobLog(logFilePath, fromLineNum));
}
/**
* 分页查询所有运行数据
*
* @return 列表
*/
@GetMapping("/list")
@ApiOperation("分页查询所有运行数据")
@ApiImplicitParams(
{@ApiImplicitParam(paramType = "query", dataType = "String", name = "current", value = "当前页", defaultValue = "1", required = true),
@ApiImplicitParam(paramType = "query", dataType = "String", name = "size", value = "一页大小", defaultValue = "10", required = true),
@ApiImplicitParam(paramType = "query", dataType = "Boolean", name = "ifCount", value = "是否查询总数", defaultValue = "true"),
@ApiImplicitParam(paramType = "query", dataType = "String", name = "ascs", value = "升序字段,多个用逗号分隔"),
@ApiImplicitParam(paramType = "query", dataType = "String", name = "descs", value = "降序字段,多个用逗号分隔")
})
public R selectAll() {
BaseForm<JobLog> baseForm = new BaseForm();
return success(this.iJobLogService.page(baseForm.getPlusPagingQueryEntity(), pageQueryWrapperCustom(baseForm.getParameters())));
}
/**
* 自定义查询组装
*
* @param map
* @return
*/
private QueryWrapper<JobLog> pageQueryWrapperCustom(Map<String, Object> map) {
// mybatis plus 分页相关的参数
Map<String, Object> pageHelperParams = PageUtils.filterPageParams(map);
log.info("分页相关的参数: {}", pageHelperParams);
//过滤空值分页查询相关的参数
Map<String, Object> columnQueryMap = PageUtils.filterColumnQueryParams(map);
log.info("字段查询条件参数为: {}", columnQueryMap);
QueryWrapper<JobLog> queryWrapper = new QueryWrapper<>();
//排序 操作
pageHelperParams.forEach((k, v) -> {
switch (k) {
case "ascs":
queryWrapper.orderByAsc(StrUtil.toUnderlineCase(StrUtil.toString(v)));
break;
case "descs":
queryWrapper.orderByDesc(StrUtil.toUnderlineCase(StrUtil.toString(v)));
break;
}
});
//遍历进行字段查询条件组装
columnQueryMap.forEach((k, v) -> {
queryWrapper.eq(StrUtil.toUnderlineCase(k), v);
});
return queryWrapper;
}
}

View File

@ -43,19 +43,31 @@ public class JobLog extends Model<JobLog> {
private String logFilePath;
/**
*
*进程Id
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
@JSONField(format = "yyyy/MM/dd")
@ApiModelProperty(value = "", hidden = true)
private Date updateDate;
private String pid;
/**
*
*执行-时间
*/
@TableLogic
@TableField(fill = FieldFill.INSERT)
@JSONField(format = "yyyy/MM/dd")
@ApiModelProperty(value = "", hidden = true)
private Integer status;
private Date handleTime;
/**
*执行-状态
*/
@ApiModelProperty(value = "", hidden = true)
private Integer handleCode;
/**
*执行-日志
*/
@ApiModelProperty(value = "", hidden = true)
private String handleMsg;
/**
*
@ -63,21 +75,6 @@ public class JobLog extends Model<JobLog> {
@ApiModelProperty(value = "", hidden = true)
private Integer createBy;
/**
*
*/
@TableField(fill = FieldFill.INSERT)
@JSONField(format = "yyyy/MM/dd")
@ApiModelProperty(value = "", hidden = true)
private Date createDate;
/**
*
*/
@ApiModelProperty(value = "", hidden = true)
private Integer updateBy;
/**
* 获取主键值
*

View File

@ -0,0 +1,44 @@
package com.wugui.dataxweb.enums;
import com.google.common.collect.Maps;
import java.util.EnumSet;
import java.util.Map;
/**
* @author Jingwk
* @date 2019/11/10
*/
public enum HandleCodeEnum {
SUCCESS(200, "SUCCESS"),
FAIL(500, "FAIL");
private Integer value;
private String desc;
HandleCodeEnum(Integer value, String desc) {
this.value = value;
this.desc = desc;
}
public Integer getValue() {
return value;
}
public String getDesc() {
return desc;
}
public static final Map<Integer, HandleCodeEnum> lookup = Maps.newHashMap();
static {
for (HandleCodeEnum e : EnumSet.allOf(HandleCodeEnum.class)) {
lookup.put(e.value, e);
}
}
public static HandleCodeEnum find(Integer value) {
HandleCodeEnum data = lookup.get(value);
return data;
}
}

View File

@ -1,6 +1,5 @@
package com.wugui.dataxweb.service;
import com.wugui.dataxweb.dto.RunJobDto;
import com.wugui.dataxweb.log.LogResult;
/**
@ -9,18 +8,28 @@ import com.wugui.dataxweb.log.LogResult;
* @create: 2019-06-17 11:25
**/
public interface IDataxJobService {
/**
* 根据json字符串用线程池启动一个datax作业
*
* 启动datax
* @param jobJson
* @author: huzekang
* @Date: 2019-06-17
* @param jobId
* @return
*/
String startJobByJsonStr(String jobJson,Long jobConfigId);
String startJobLog(RunJobDto runJobDto);
String startJob(String jobJson,Long jobId);
/**
* 根据JobId获取最近一次运行日志
* @param id
* @param fromLineNum
* @return
*/
LogResult viewJogLog(Long id, int fromLineNum);
Boolean killJob(String pid);
/**
* 结束datax进程
* @param pid
* @param id
* @return
*/
Boolean killJob(String pid,Long id);
}

View File

@ -2,6 +2,7 @@ package com.wugui.dataxweb.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.log.LogResult;
/**
* 抽取日志记录表表服务接口
@ -11,5 +12,11 @@ import com.wugui.dataxweb.entity.JobLog;
* @since 2019-06-27
*/
public interface IJobLogService extends IService<JobLog> {
/**
* 获取日志详情
* @param logFilePath 日志文件路径
* @param fromLineNum 行数
* @return
*/
LogResult viewJobLog(String logFilePath,int fromLineNum);
}

View File

@ -3,12 +3,10 @@ package com.wugui.dataxweb.service.impl;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.wugui.dataxweb.dto.RunJobDto;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.log.EtlJobFileAppender;
import com.wugui.dataxweb.log.EtlJobLogger;
@ -28,6 +26,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
@ -39,6 +38,7 @@ import java.util.concurrent.*;
@Slf4j
@Service
public class IDataxJobServiceImpl implements IDataxJobService {
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build();
private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
@ -58,7 +58,7 @@ public class IDataxJobServiceImpl implements IDataxJobService {
private IJobLogService jobLogService;
@Override
public String startJobByJsonStr(String jobJson, Long jobConfigId) {
public String startJob(String jobJson, Long jobId) {
jobPool.submit(() -> {
final String tmpFilePath = "jobTmp-" + System.currentTimeMillis() + ".conf";
@ -71,11 +71,11 @@ public class IDataxJobServiceImpl implements IDataxJobService {
try {
Process p = Runtime.getRuntime().exec(new String[]{"python", getDataXPyPath(), tmpFilePath});
String processId = ProcessUtil.getProcessId(p);
log.info("Job:{},Datax运行进程Id:{}", jobConfigId, processId);
EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobConfigId + ",Datax运行进程Id: " + processId);
jobTmpFiles.put(processId, tmpFilePath);
ExecDataXOutputThread output = new ExecDataXOutputThread(p.getInputStream(), logFilePath, tmpFilePath);
ExecDataXOutputThread error = new ExecDataXOutputThread(p.getErrorStream(), logFilePath, tmpFilePath);
Long id = saveLog(jobId, processId);
EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobId + ",Datax运行进程Id: " + processId);
ExecDataXOutputThread output = new ExecDataXOutputThread(id, p.getInputStream(), logFilePath, tmpFilePath);
ExecDataXOutputThread error = new ExecDataXOutputThread(id, p.getErrorStream(), logFilePath, tmpFilePath);
output.start();
error.start();
} catch (Exception e) {
@ -86,6 +86,21 @@ public class IDataxJobServiceImpl implements IDataxJobService {
return "success";
}
private Long saveLog(Long jobId, String processId) {
//根据jobId和当前时间戳生成日志文件名
String logFileName = jobId.toString().concat("_").concat(StrUtil.toString(System.currentTimeMillis()).concat(".log"));
logFilePath = etlLogDir.concat(logFileName);
//记录日志
JobLog jobLog = new JobLog();
jobLog.setJobId(jobId);
jobLog.setLogFilePath(logFilePath);
jobLog.setPid(processId);
jobLog.setHandleCode(0);
jobLog.setHandleTime(new Date());
jobLogService.save(jobLog);
return jobLog.getId();
}
private String getDataXPyPath() {
String dataxPyPath;
String dataXHome = System.getenv("DATAX_HOME");
@ -100,27 +115,11 @@ public class IDataxJobServiceImpl implements IDataxJobService {
}
@Override
public String startJobLog(RunJobDto runJobDto) {
//取出 jobJson并转为json对象
JSONObject json = JSONObject.parseObject(runJobDto.getJobJson());
//根据jobId和当前时间戳生成日志文件名
String logFileName = runJobDto.getJobConfigId().toString().concat("_").concat(StrUtil.toString(System.currentTimeMillis()).concat(".log"));
logFilePath = etlLogDir.concat(logFileName);
//记录日志
JobLog jobLog = new JobLog();
jobLog.setJobId(runJobDto.getJobConfigId());
jobLog.setLogFilePath(logFilePath);
jobLogService.save(jobLog);
//启动任务
return startJobByJsonStr(JSON.toJSONString(json), runJobDto.getJobConfigId());
}
@Override
public LogResult viewJogLog(Long id, int fromLineNum) {
QueryWrapper<JobLog> queryWrapper = new QueryWrapper<>();
//根据id获取最新的日志文件路径
queryWrapper.lambda().eq(JobLog::getJobId, id).orderByDesc(JobLog::getCreateDate);
queryWrapper.lambda().eq(JobLog::getJobId, id).orderByDesc(JobLog::getHandleTime);
List<JobLog> list = jobLogService.list(queryWrapper);
//取最新的一条记录
if (list.isEmpty()) {
@ -132,7 +131,7 @@ public class IDataxJobServiceImpl implements IDataxJobService {
}
@Override
public Boolean killJob(String pid) {
public Boolean killJob(String pid, Long id) {
boolean result = ProcessUtil.killProcessByPid(pid);
// 删除临时文件
if (!CollectionUtils.isEmpty(jobTmpFiles)) {
@ -141,6 +140,7 @@ public class IDataxJobServiceImpl implements IDataxJobService {
FileUtil.del(new File(pathname));
}
}
jobLogService.update(null, Wrappers.<JobLog>lambdaUpdate().set(JobLog::getHandleMsg, "job runningkilled").set(JobLog::getHandleCode, 500).eq(JobLog::getId, id));
return result;
}

View File

@ -3,10 +3,14 @@ package com.wugui.dataxweb.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.wugui.dataxweb.dao.JobLogMapper;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.log.EtlJobLogger;
import com.wugui.dataxweb.log.LogResult;
import com.wugui.dataxweb.service.IJobLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 抽取日志记录表表服务实现类
*
@ -18,4 +22,11 @@ import org.springframework.transaction.annotation.Transactional;
@Transactional(readOnly = true)
public class JobLogServiceImpl extends ServiceImpl<JobLogMapper, JobLog> implements IJobLogService {
@Autowired
private IJobLogService jobLogService;
@Override
public LogResult viewJobLog(String logFilePath,int fromLineNum) {
return EtlJobLogger.readLog(logFilePath, fromLineNum);
}
}

View File

@ -1,6 +1,10 @@
package com.wugui.dataxweb.thread;
import cn.hutool.core.io.FileUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.wugui.dataxweb.config.DataXWebConfig;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.enums.HandleCodeEnum;
import com.wugui.dataxweb.log.EtlJobFileAppender;
import lombok.extern.slf4j.Slf4j;
@ -21,8 +25,10 @@ public class ExecDataXOutputThread extends Thread {
private InputStream is;
private String logFilePath;
private String tmpFilePath;
private Long id;
public ExecDataXOutputThread(InputStream is, String logFilePath, String tmpFilePath) {
public ExecDataXOutputThread(Long id, InputStream is, String logFilePath, String tmpFilePath) {
this.id = id;
this.is = is;
this.logFilePath = logFilePath;
this.tmpFilePath = tmpFilePath;
@ -33,12 +39,17 @@ public class ExecDataXOutputThread extends Thread {
InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
String line = null;
int handleCode = HandleCodeEnum.SUCCESS.getValue();
while ((line = br.readLine()) != null) {
EtlJobFileAppender.appendLog(logFilePath, line);
if (handleCode != HandleCodeEnum.FAIL.getValue() && line.contains("Exception")) {
handleCode = HandleCodeEnum.FAIL.getValue();
}
log.info(line);
}
// 删除临时文件
FileUtil.del(new File(tmpFilePath));
DataXWebConfig.getDataXWebConfig().getJobLogService().update(null, Wrappers.<JobLog>lambdaUpdate().set(JobLog::getHandleCode, handleCode).eq(JobLog::getId, id));
} catch (IOException e) {
log.error("DataX 执行异常:{0}", e);
}