From a5bb7fdf3371ba6355b7438e131fdf8ffd509552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=AF=E6=96=87=E5=87=AF?= Date: Thu, 14 Nov 2019 08:59:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ejob=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E6=8E=A5=E5=8F=A3=20=E6=96=B0=E5=A2=9Ejob?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E7=8A=B6=E6=80=81=E6=9B=B4=E6=96=B0=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=20=E4=BB=8Edatax=E6=97=A5=E5=BF=97=E4=B8=AD=E8=BF=87?= =?UTF-8?q?=E6=BB=A4=E5=BC=82=E5=B8=B8=E5=B9=B6=E6=9B=B4=E6=96=B0job?= =?UTF-8?q?=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datax-web/db/datax_web.sql | 57 +++++----- .../wugui/dataxweb/config/DataXWebConfig.java | 31 ++++++ .../dataxweb/controller/JobController.java | 12 +- .../dataxweb/controller/JobLogController.java | 103 ++++++++++++++++++ .../com/wugui/dataxweb/entity/JobLog.java | 41 ++++--- .../wugui/dataxweb/enums/HandleCodeEnum.java | 44 ++++++++ .../dataxweb/service/IDataxJobService.java | 27 +++-- .../dataxweb/service/IJobLogService.java | 9 +- .../service/impl/IDataxJobServiceImpl.java | 52 ++++----- .../service/impl/JobLogServiceImpl.java | 11 ++ .../thread/ExecDataXOutputThread.java | 13 ++- 11 files changed, 305 insertions(+), 95 deletions(-) create mode 100644 datax-web/src/main/java/com/wugui/dataxweb/config/DataXWebConfig.java create mode 100644 datax-web/src/main/java/com/wugui/dataxweb/controller/JobLogController.java create mode 100644 datax-web/src/main/java/com/wugui/dataxweb/enums/HandleCodeEnum.java diff --git a/datax-web/db/datax_web.sql b/datax-web/db/datax_web.sql index 88577754..f344b1d0 100644 --- a/datax-web/db/datax_web.sql +++ b/datax-web/db/datax_web.sql @@ -105,41 +105,38 @@ CREATE TABLE `job_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_id` bigint(20) NOT NULL COMMENT '抽取任务,主键ID', `log_file_path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '日志文件路径', - `update_date` datetime(0) NULL DEFAULT NULL, - `status` int(1) NULL DEFAULT 1, + `pid` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '任务进程ID', + `handle_time` datetime(0) NULL DEFAULT NULL COMMENT '执行-时间', + `handle_code` int(11) NULL DEFAULT 0 COMMENT '执行-状态', + `handle_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行-日志', `create_by` int(11) NULL DEFAULT NULL, - `create_date` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), - `update_by` int(11) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE -) ENGINE = InnoDB AUTO_INCREMENT = 26 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '抽取日志记录表' ROW_FORMAT = Dynamic; +) ENGINE = InnoDB AUTO_INCREMENT = 0 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '抽取日志记录表' ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of job_log -- ---------------------------- -INSERT INTO `job_log` VALUES (1, 1, '/data/applogs/datax-web/1_1561638145701', NULL, 1, NULL, '2019-06-27 08:22:26', NULL); -INSERT INTO `job_log` VALUES (2, 1, '/data/applogs/datax-web/1_1561638359975.log', NULL, 1, NULL, '2019-06-27 08:26:00', NULL); -INSERT INTO `job_log` VALUES (3, 1, '/data/applogs/datax-web/1_1561638760584.log', NULL, 1, NULL, '2019-06-27 08:32:41', NULL); -INSERT INTO `job_log` VALUES (4, 4, '/data/applogs/datax-web/4_1564480870563.log', '2019-07-30 18:01:11', 1, NULL, '2019-07-30 18:01:11', NULL); -INSERT INTO `job_log` VALUES (5, 3, '/data/applogs/datax-web/3_1564481419699.log', '2019-07-30 18:10:20', 1, NULL, '2019-07-30 18:10:20', NULL); -INSERT INTO `job_log` VALUES (6, 3, '/data/applogs/datax-web/3_1564485764734.log', '2019-07-30 19:22:45', 1, NULL, '2019-07-30 19:22:45', NULL); -INSERT INTO `job_log` VALUES (7, 3, '/data/applogs/datax-web/3_1564485918860.log', '2019-07-30 19:25:19', 1, NULL, '2019-07-30 19:25:19', NULL); -INSERT INTO `job_log` VALUES (8, 3, '/data/applogs/datax-web/3_1564486087223.log', '2019-07-30 19:28:07', 1, NULL, '2019-07-30 19:28:07', NULL); -INSERT INTO `job_log` VALUES (9, 3, '/data/applogs/datax-web/3_1564486152278.log', '2019-07-30 19:29:12', 1, NULL, '2019-07-30 19:29:12', NULL); -INSERT INTO `job_log` VALUES (10, 3, '/data/applogs/datax-web/3_1564486351631.log', '2019-07-30 19:32:32', 1, NULL, '2019-07-30 19:32:32', NULL); -INSERT INTO `job_log` VALUES (11, 3, '/data/applogs/datax-web/3_1564486375214.log', '2019-07-30 19:32:55', 1, NULL, '2019-07-30 19:32:55', NULL); -INSERT INTO `job_log` VALUES (12, 3, '/data/applogs/datax-web/3_1564486398393.log', '2019-07-30 19:33:18', 1, NULL, '2019-07-30 19:33:18', NULL); -INSERT INTO `job_log` VALUES (13, 3, '/data/applogs/datax-web/3_1564488007122.log', '2019-07-30 20:00:07', 1, NULL, '2019-07-30 20:00:07', NULL); -INSERT INTO `job_log` VALUES (14, 3, '/data/applogs/datax-web/3_1564489795800.log', '2019-07-30 20:29:56', 1, NULL, '2019-07-30 20:29:56', NULL); -INSERT INTO `job_log` VALUES (15, 3, '/data/applogs/datax-web/3_1564490723427.log', '2019-07-30 20:45:23', 1, NULL, '2019-07-30 20:45:23', NULL); -INSERT INTO `job_log` VALUES (16, 3, '/data/applogs/datax-web/3_1564490843863.log', '2019-07-30 20:47:24', 1, NULL, '2019-07-30 20:47:24', NULL); -INSERT INTO `job_log` VALUES (17, 3, '/data/applogs/datax-web/3_1564491445033.log', '2019-07-30 20:57:25', 1, NULL, '2019-07-30 20:57:25', NULL); -INSERT INTO `job_log` VALUES (18, 3, '/data/applogs/datax-web/3_1564491868935.log', '2019-07-30 21:04:29', 1, NULL, '2019-07-30 21:04:29', NULL); -INSERT INTO `job_log` VALUES (19, 3, '/data/applogs/datax-web/3_1564492047112.log', '2019-07-30 21:07:27', 1, NULL, '2019-07-30 21:07:27', NULL); -INSERT INTO `job_log` VALUES (20, 3, '/data/applogs/datax-web/3_1564492173290.log', '2019-07-30 21:09:33', 1, NULL, '2019-07-30 21:09:33', NULL); -INSERT INTO `job_log` VALUES (21, 3, '/data/applogs/datax-web/3_1564492308532.log', '2019-07-30 21:11:49', 1, NULL, '2019-07-30 21:11:49', NULL); -INSERT INTO `job_log` VALUES (22, 3, '/data/applogs/datax-web/3_1564492378732.log', '2019-07-30 21:12:59', 1, NULL, '2019-07-30 21:12:59', NULL); -INSERT INTO `job_log` VALUES (23, 3, '/data/applogs/datax-web/3_1564495260737.log', '2019-07-30 22:01:01', 1, NULL, '2019-07-30 22:01:01', NULL); -INSERT INTO `job_log` VALUES (24, 3, '/data/applogs/datax-web/3_1564574180078.log', '2019-07-31 19:56:20', 1, NULL, '2019-07-31 19:56:20', NULL); -INSERT INTO `job_log` VALUES (25, 3, '/data/applogs/datax-web/3_1564574319878.log', '2019-07-31 19:58:40', 1, NULL, '2019-07-31 19:58:40', NULL); +INSERT INTO `job_log` VALUES (12, 4, 'D:\\temp\\logs\\datax-web\\4_1573628574883.log', '88612', '2019-11-13 15:02:55', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (13, 4, 'D:\\temp\\logs\\datax-web\\4_1573628575201.log', '88772', '2019-11-13 15:02:55', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (14, 4, 'D:\\temp\\logs\\datax-web\\4_1573628576097.log', '88984', '2019-11-13 15:02:56', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (15, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589106.log', '89224', '2019-11-13 15:03:09', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (16, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589293.log', '89364', '2019-11-13 15:03:09', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (17, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589509.log', '89520', '2019-11-13 15:03:10', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (18, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589670.log', '89652', '2019-11-13 15:03:10', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (19, 4, 'D:\\temp\\logs\\datax-web\\4_1573628589859.log', '89720', '2019-11-13 15:03:10', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (20, 4, 'D:\\temp\\logs\\datax-web\\4_1573628590201.log', '89832', '2019-11-13 15:03:10', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (21, 4, 'D:\\temp\\logs\\datax-web\\4_1573628590325.log', '89912', '2019-11-13 15:03:10', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (22, 6, 'D:\\temp\\logs\\datax-web\\6_1573630200227.log', '18352', '2019-11-13 15:30:00', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (23, 6, 'D:\\temp\\logs\\datax-web\\6_1573630558990.log', '74672', '2019-11-13 15:35:59', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (24, 6, 'D:\\temp\\logs\\datax-web\\6_1573635226093.log', '93744', '2019-11-13 16:53:46', 0, NULL, NULL); +INSERT INTO `job_log` VALUES (25, 4, 'D:\\temp\\logs\\datax-web\\4_1573635586309.log', '94472', '2019-11-13 16:59:46', 200, NULL, NULL); +INSERT INTO `job_log` VALUES (26, 4, 'D:\\temp\\logs\\datax-web\\4_1573635777426.log', '94460', '2019-11-13 17:02:57', 500, NULL, NULL); +INSERT INTO `job_log` VALUES (27, 3, 'D:\\temp\\logs\\datax-web\\3_1573637050199.log', '97044', '2019-11-13 17:24:10', 200, NULL, NULL); +INSERT INTO `job_log` VALUES (28, 3, 'D:\\temp\\logs\\datax-web\\3_1573637194736.log', '89716', '2019-11-13 17:26:35', 200, NULL, NULL); +INSERT INTO `job_log` VALUES (29, 3, 'D:\\temp\\logs\\datax-web\\3_1573638319908.log', '97816', '2019-11-13 17:45:20', 200, NULL, NULL); +INSERT INTO `job_log` VALUES (30, 3, 'D:\\temp\\logs\\datax-web\\3_1573638524001.log', '97888', '2019-11-13 17:48:44', 200, NULL, NULL); +INSERT INTO `job_log` VALUES (31, 3, 'D:\\temp\\logs\\datax-web\\3_1573638646217.log', '97884', '2019-11-13 17:50:46', 500, NULL, NULL); +INSERT INTO `job_log` VALUES (32, 3, 'D:\\temp\\logs\\datax-web\\3_1573638967908.log', '96844', '2019-11-13 17:56:08', 500, NULL, NULL); +INSERT INTO `job_log` VALUES (33, 3, 'D:\\temp\\logs\\datax-web\\3_1573639056074.log', '92228', '2019-11-13 17:57:36', 500, NULL, NULL); SET FOREIGN_KEY_CHECKS = 1; diff --git a/datax-web/src/main/java/com/wugui/dataxweb/config/DataXWebConfig.java b/datax-web/src/main/java/com/wugui/dataxweb/config/DataXWebConfig.java new file mode 100644 index 00000000..db20ee6b --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/config/DataXWebConfig.java @@ -0,0 +1,31 @@ +package com.wugui.dataxweb.config; + +import com.wugui.dataxweb.service.IJobLogService; +import lombok.Getter; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +/** + * datax-web config + * + * @author jingwk 2019-11-10 + */ +@Component +@Getter +public class DataXWebConfig implements InitializingBean { + private static DataXWebConfig dataXWebConfig = null; + + public static DataXWebConfig getDataXWebConfig() { + return dataXWebConfig; + } + + @Override + public void afterPropertiesSet() throws Exception { + dataXWebConfig=this; + } + + @Autowired + private IJobLogService jobLogService; +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java index 183ef724..60295ed8 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java @@ -27,7 +27,7 @@ public class JobController { public void testStartJob() { // 指定获取作业配置json的接口,此处用下面mock出来的接口提供 String jobPath = "http://localhost:8080/mock_stream2stream"; - iDataxJobService.startJobByJsonStr(jobPath,0L); + iDataxJobService.startJob(jobPath,0L); } @GetMapping("/mock_oracle2mongodb") @@ -134,7 +134,7 @@ public class JobController { @ApiOperation("通过传入json配置启动一个datax作业") @PostMapping("/runJob") public R runJob(@RequestBody RunJobDto runJobDto) { - String result = iDataxJobService.startJobByJsonStr(runJobDto.getJobJson(),runJobDto.getJobConfigId()); + String result = iDataxJobService.startJob(runJobDto.getJobJson(),runJobDto.getJobConfigId()); return R.ok(result); } @@ -147,14 +147,14 @@ public class JobController { @ApiOperation("通过传入 runJobDto 实体启动一个datax作业,并记录日志") @PostMapping("/runJobLog") public R runJobLog(@RequestBody RunJobDto runJobDto) { - String result = iDataxJobService.startJobLog(runJobDto); + String result = iDataxJobService.startJob(runJobDto.getJobJson(),runJobDto.getJobConfigId()); return R.ok(result); } @ApiOperation("通过传入 进程ID 停止该job作业") - @GetMapping("/killJob/{pid}") - public R killJob(@PathVariable(value ="pid") String pid){ - return R.ok(iDataxJobService.killJob(pid)); + @GetMapping("/killJob/{pid}/{id}") + public R killJob(@PathVariable(value ="pid") String pid,@PathVariable(value = "id") Long id){ + return R.ok(iDataxJobService.killJob(pid,id)); } /** diff --git a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobLogController.java b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobLogController.java new file mode 100644 index 00000000..c494a1f8 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobLogController.java @@ -0,0 +1,103 @@ +package com.wugui.dataxweb.controller; + +import cn.hutool.core.util.StrUtil; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.api.ApiController; +import com.baomidou.mybatisplus.extension.api.R; +import com.wugui.dataxweb.entity.JobLog; +import com.wugui.dataxweb.log.LogResult; +import com.wugui.dataxweb.service.IJobLogService; +import com.wugui.dataxweb.util.PageUtils; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +/** + * ProcessUtil + * + * @author jingwk + * @version 1.0 + * @since 2019/11/10 + */ + +@RestController +@RequestMapping("log") +@Api(tags = "datax日志接口") +@Slf4j +public class JobLogController extends ApiController { + + @Autowired + private IJobLogService iJobLogService; + + /** + * 根据日志路径查看详情 + * + * @param logFilePath + * @param fromLineNum + * @return + */ + @ApiOperation("查看任务抽取日志,logFilePath为日志路径,fromLineNum为读取的行数") + @GetMapping("/view") + public R viewJobLog(String logFilePath, int fromLineNum) { + return R.ok(iJobLogService.viewJobLog(logFilePath, fromLineNum)); + } + + /** + * 分页查询所有运行数据 + * + * @return 列表 + */ + @GetMapping("/list") + @ApiOperation("分页查询所有运行数据") + @ApiImplicitParams( + {@ApiImplicitParam(paramType = "query", dataType = "String", name = "current", value = "当前页", defaultValue = "1", required = true), + @ApiImplicitParam(paramType = "query", dataType = "String", name = "size", value = "一页大小", defaultValue = "10", required = true), + @ApiImplicitParam(paramType = "query", dataType = "Boolean", name = "ifCount", value = "是否查询总数", defaultValue = "true"), + @ApiImplicitParam(paramType = "query", dataType = "String", name = "ascs", value = "升序字段,多个用逗号分隔"), + @ApiImplicitParam(paramType = "query", dataType = "String", name = "descs", value = "降序字段,多个用逗号分隔") + }) + public R selectAll() { + BaseForm baseForm = new BaseForm(); + return success(this.iJobLogService.page(baseForm.getPlusPagingQueryEntity(), pageQueryWrapperCustom(baseForm.getParameters()))); + } + + /** + * 自定义查询组装 + * + * @param map + * @return + */ + private QueryWrapper pageQueryWrapperCustom(Map map) { + // mybatis plus 分页相关的参数 + Map pageHelperParams = PageUtils.filterPageParams(map); + log.info("分页相关的参数: {}", pageHelperParams); + //过滤空值,分页查询相关的参数 + Map columnQueryMap = PageUtils.filterColumnQueryParams(map); + log.info("字段查询条件参数为: {}", columnQueryMap); + QueryWrapper queryWrapper = new QueryWrapper<>(); + //排序 操作 + pageHelperParams.forEach((k, v) -> { + switch (k) { + case "ascs": + queryWrapper.orderByAsc(StrUtil.toUnderlineCase(StrUtil.toString(v))); + break; + case "descs": + queryWrapper.orderByDesc(StrUtil.toUnderlineCase(StrUtil.toString(v))); + break; + } + }); + //遍历进行字段查询条件组装 + columnQueryMap.forEach((k, v) -> { + queryWrapper.eq(StrUtil.toUnderlineCase(k), v); + }); + return queryWrapper; + } +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java b/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java index 732ee74e..ac3270fb 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java @@ -43,19 +43,31 @@ public class JobLog extends Model { private String logFilePath; /** - * + *进程Id */ - @TableField(fill = FieldFill.INSERT_UPDATE) - @JSONField(format = "yyyy/MM/dd") @ApiModelProperty(value = "", hidden = true) - private Date updateDate; + private String pid; /** - * + *执行-时间 */ - @TableLogic + @TableField(fill = FieldFill.INSERT) + @JSONField(format = "yyyy/MM/dd") @ApiModelProperty(value = "", hidden = true) - private Integer status; + private Date handleTime; + + /** + *执行-状态 + */ + @ApiModelProperty(value = "", hidden = true) + private Integer handleCode; + + + /** + *执行-日志 + */ + @ApiModelProperty(value = "", hidden = true) + private String handleMsg; /** * @@ -63,21 +75,6 @@ public class JobLog extends Model { @ApiModelProperty(value = "", hidden = true) private Integer createBy; - /** - * - */ - @TableField(fill = FieldFill.INSERT) - @JSONField(format = "yyyy/MM/dd") - @ApiModelProperty(value = "", hidden = true) - private Date createDate; - - /** - * - */ - @ApiModelProperty(value = "", hidden = true) - private Integer updateBy; - - /** * 获取主键值 * diff --git a/datax-web/src/main/java/com/wugui/dataxweb/enums/HandleCodeEnum.java b/datax-web/src/main/java/com/wugui/dataxweb/enums/HandleCodeEnum.java new file mode 100644 index 00000000..6c0f3f34 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/enums/HandleCodeEnum.java @@ -0,0 +1,44 @@ +package com.wugui.dataxweb.enums; + +import com.google.common.collect.Maps; + +import java.util.EnumSet; +import java.util.Map; + +/** + * @author Jingwk + * @date 2019/11/10 + */ +public enum HandleCodeEnum { + SUCCESS(200, "SUCCESS"), + FAIL(500, "FAIL"); + + private Integer value; + private String desc; + + HandleCodeEnum(Integer value, String desc) { + this.value = value; + this.desc = desc; + } + + public Integer getValue() { + return value; + } + + public String getDesc() { + return desc; + } + + public static final Map lookup = Maps.newHashMap(); + + static { + for (HandleCodeEnum e : EnumSet.allOf(HandleCodeEnum.class)) { + lookup.put(e.value, e); + } + } + + public static HandleCodeEnum find(Integer value) { + HandleCodeEnum data = lookup.get(value); + return data; + } +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java index 8966cffd..1fa1e1fa 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java @@ -1,6 +1,5 @@ package com.wugui.dataxweb.service; -import com.wugui.dataxweb.dto.RunJobDto; import com.wugui.dataxweb.log.LogResult; /** @@ -9,18 +8,28 @@ import com.wugui.dataxweb.log.LogResult; * @create: 2019-06-17 11:25 **/ public interface IDataxJobService { + /** - * 根据json字符串用线程池启动一个datax作业 - * + * 启动datax * @param jobJson - * @author: huzekang - * @Date: 2019-06-17 + * @param jobId + * @return */ - String startJobByJsonStr(String jobJson,Long jobConfigId); - - String startJobLog(RunJobDto runJobDto); + String startJob(String jobJson,Long jobId); + /** + * 根据JobId获取最近一次运行日志 + * @param id + * @param fromLineNum + * @return + */ LogResult viewJogLog(Long id, int fromLineNum); - Boolean killJob(String pid); + /** + * 结束datax进程 + * @param pid + * @param id + * @return + */ + Boolean killJob(String pid,Long id); } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java b/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java index c5505acf..87e48b34 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java @@ -2,6 +2,7 @@ package com.wugui.dataxweb.service; import com.baomidou.mybatisplus.extension.service.IService; import com.wugui.dataxweb.entity.JobLog; +import com.wugui.dataxweb.log.LogResult; /** * 抽取日志记录表表服务接口 @@ -11,5 +12,11 @@ import com.wugui.dataxweb.entity.JobLog; * @since 2019-06-27 */ public interface IJobLogService extends IService { - + /** + * 获取日志详情 + * @param logFilePath 日志文件路径 + * @param fromLineNum 行数 + * @return + */ + LogResult viewJobLog(String logFilePath,int fromLineNum); } \ No newline at end of file diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java index 47b25630..64a567da 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java @@ -3,12 +3,10 @@ package com.wugui.dataxweb.service.impl; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.wugui.dataxweb.dto.RunJobDto; import com.wugui.dataxweb.entity.JobLog; import com.wugui.dataxweb.log.EtlJobFileAppender; import com.wugui.dataxweb.log.EtlJobLogger; @@ -28,6 +26,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; +import java.util.Date; import java.util.List; import java.util.concurrent.*; @@ -39,6 +38,7 @@ import java.util.concurrent.*; @Slf4j @Service public class IDataxJobServiceImpl implements IDataxJobService { + private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build(); private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, @@ -58,7 +58,7 @@ public class IDataxJobServiceImpl implements IDataxJobService { private IJobLogService jobLogService; @Override - public String startJobByJsonStr(String jobJson, Long jobConfigId) { + public String startJob(String jobJson, Long jobId) { jobPool.submit(() -> { final String tmpFilePath = "jobTmp-" + System.currentTimeMillis() + ".conf"; @@ -71,11 +71,11 @@ public class IDataxJobServiceImpl implements IDataxJobService { try { Process p = Runtime.getRuntime().exec(new String[]{"python", getDataXPyPath(), tmpFilePath}); String processId = ProcessUtil.getProcessId(p); - log.info("Job:{},Datax运行进程Id:{}", jobConfigId, processId); - EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobConfigId + ",Datax运行进程Id: " + processId); jobTmpFiles.put(processId, tmpFilePath); - ExecDataXOutputThread output = new ExecDataXOutputThread(p.getInputStream(), logFilePath, tmpFilePath); - ExecDataXOutputThread error = new ExecDataXOutputThread(p.getErrorStream(), logFilePath, tmpFilePath); + Long id = saveLog(jobId, processId); + EtlJobFileAppender.appendLog(logFilePath, "\n\nJob: " + jobId + ",Datax运行进程Id: " + processId); + ExecDataXOutputThread output = new ExecDataXOutputThread(id, p.getInputStream(), logFilePath, tmpFilePath); + ExecDataXOutputThread error = new ExecDataXOutputThread(id, p.getErrorStream(), logFilePath, tmpFilePath); output.start(); error.start(); } catch (Exception e) { @@ -86,6 +86,21 @@ public class IDataxJobServiceImpl implements IDataxJobService { return "success"; } + private Long saveLog(Long jobId, String processId) { + //根据jobId和当前时间戳生成日志文件名 + String logFileName = jobId.toString().concat("_").concat(StrUtil.toString(System.currentTimeMillis()).concat(".log")); + logFilePath = etlLogDir.concat(logFileName); + //记录日志 + JobLog jobLog = new JobLog(); + jobLog.setJobId(jobId); + jobLog.setLogFilePath(logFilePath); + jobLog.setPid(processId); + jobLog.setHandleCode(0); + jobLog.setHandleTime(new Date()); + jobLogService.save(jobLog); + return jobLog.getId(); + } + private String getDataXPyPath() { String dataxPyPath; String dataXHome = System.getenv("DATAX_HOME"); @@ -100,27 +115,11 @@ public class IDataxJobServiceImpl implements IDataxJobService { } - @Override - public String startJobLog(RunJobDto runJobDto) { - //取出 jobJson,并转为json对象 - JSONObject json = JSONObject.parseObject(runJobDto.getJobJson()); - //根据jobId和当前时间戳生成日志文件名 - String logFileName = runJobDto.getJobConfigId().toString().concat("_").concat(StrUtil.toString(System.currentTimeMillis()).concat(".log")); - logFilePath = etlLogDir.concat(logFileName); - //记录日志 - JobLog jobLog = new JobLog(); - jobLog.setJobId(runJobDto.getJobConfigId()); - jobLog.setLogFilePath(logFilePath); - jobLogService.save(jobLog); - //启动任务 - return startJobByJsonStr(JSON.toJSONString(json), runJobDto.getJobConfigId()); - } - @Override public LogResult viewJogLog(Long id, int fromLineNum) { QueryWrapper queryWrapper = new QueryWrapper<>(); //根据id获取最新的日志文件路径 - queryWrapper.lambda().eq(JobLog::getJobId, id).orderByDesc(JobLog::getCreateDate); + queryWrapper.lambda().eq(JobLog::getJobId, id).orderByDesc(JobLog::getHandleTime); List list = jobLogService.list(queryWrapper); //取最新的一条记录 if (list.isEmpty()) { @@ -132,7 +131,7 @@ public class IDataxJobServiceImpl implements IDataxJobService { } @Override - public Boolean killJob(String pid) { + public Boolean killJob(String pid, Long id) { boolean result = ProcessUtil.killProcessByPid(pid); // 删除临时文件 if (!CollectionUtils.isEmpty(jobTmpFiles)) { @@ -141,6 +140,7 @@ public class IDataxJobServiceImpl implements IDataxJobService { FileUtil.del(new File(pathname)); } } + jobLogService.update(null, Wrappers.lambdaUpdate().set(JobLog::getHandleMsg, "job running,killed").set(JobLog::getHandleCode, 500).eq(JobLog::getId, id)); return result; } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java index a74fccbe..4f063ef9 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java @@ -3,10 +3,14 @@ package com.wugui.dataxweb.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.wugui.dataxweb.dao.JobLogMapper; import com.wugui.dataxweb.entity.JobLog; +import com.wugui.dataxweb.log.EtlJobLogger; +import com.wugui.dataxweb.log.LogResult; import com.wugui.dataxweb.service.IJobLogService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; + /** * 抽取日志记录表表服务实现类 * @@ -18,4 +22,11 @@ import org.springframework.transaction.annotation.Transactional; @Transactional(readOnly = true) public class JobLogServiceImpl extends ServiceImpl implements IJobLogService { + @Autowired + private IJobLogService jobLogService; + + @Override + public LogResult viewJobLog(String logFilePath,int fromLineNum) { + return EtlJobLogger.readLog(logFilePath, fromLineNum); + } } \ No newline at end of file diff --git a/datax-web/src/main/java/com/wugui/dataxweb/thread/ExecDataXOutputThread.java b/datax-web/src/main/java/com/wugui/dataxweb/thread/ExecDataXOutputThread.java index 8c347c25..d552be96 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/thread/ExecDataXOutputThread.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/thread/ExecDataXOutputThread.java @@ -1,6 +1,10 @@ package com.wugui.dataxweb.thread; import cn.hutool.core.io.FileUtil; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.wugui.dataxweb.config.DataXWebConfig; +import com.wugui.dataxweb.entity.JobLog; +import com.wugui.dataxweb.enums.HandleCodeEnum; import com.wugui.dataxweb.log.EtlJobFileAppender; import lombok.extern.slf4j.Slf4j; @@ -21,8 +25,10 @@ public class ExecDataXOutputThread extends Thread { private InputStream is; private String logFilePath; private String tmpFilePath; + private Long id; - public ExecDataXOutputThread(InputStream is, String logFilePath, String tmpFilePath) { + public ExecDataXOutputThread(Long id, InputStream is, String logFilePath, String tmpFilePath) { + this.id = id; this.is = is; this.logFilePath = logFilePath; this.tmpFilePath = tmpFilePath; @@ -33,12 +39,17 @@ public class ExecDataXOutputThread extends Thread { InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8); BufferedReader br = new BufferedReader(isr); String line = null; + int handleCode = HandleCodeEnum.SUCCESS.getValue(); while ((line = br.readLine()) != null) { EtlJobFileAppender.appendLog(logFilePath, line); + if (handleCode != HandleCodeEnum.FAIL.getValue() && line.contains("Exception")) { + handleCode = HandleCodeEnum.FAIL.getValue(); + } log.info(line); } // 删除临时文件 FileUtil.del(new File(tmpFilePath)); + DataXWebConfig.getDataXWebConfig().getJobLogService().update(null, Wrappers.lambdaUpdate().set(JobLog::getHandleCode, handleCode).eq(JobLog::getId, id)); } catch (IOException e) { log.error("DataX 执行异常:{0}", e); }