diff --git a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java index fe28015a..7ad0f073 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java @@ -1,7 +1,7 @@ package com.wugui.dataxweb.controller; -import com.alibaba.datax.core.Engine; import com.baomidou.mybatisplus.extension.api.R; +import com.wugui.dataxweb.dto.RunJobDto; import com.wugui.dataxweb.service.IDataxJobService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -136,5 +136,17 @@ public class JobController { return R.ok(result); } + /** + * 通过接口传入 runJobDto 实体启动一个datax作业,并记录日志 + * + * @param runJobDto + * @return + */ + @ApiOperation("通过传入 runJobDto 实体启动一个datax作业,并记录日志") + @PostMapping("/runJobLog") + public R runJobLog(@RequestBody RunJobDto runJobDto) { + String result = iDataxJobService.startJobLog(runJobDto); + return R.ok(result); + } } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/dao/JobLogMapper.java b/datax-web/src/main/java/com/wugui/dataxweb/dao/JobLogMapper.java new file mode 100644 index 00000000..e969bfc7 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/dao/JobLogMapper.java @@ -0,0 +1,15 @@ +package com.wugui.dataxweb.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.wugui.dataxweb.entity.JobLog; + +/** + * 抽取日志记录表表数据库访问层 + * + * @author zhouhongfa@gz-yibo.com + * @version v1.0 + * @since 2019-06-27 + */ +public interface JobLogMapper extends BaseMapper { + +} \ No newline at end of file diff --git a/datax-web/src/main/java/com/wugui/dataxweb/dto/RunJobDto.java b/datax-web/src/main/java/com/wugui/dataxweb/dto/RunJobDto.java new file mode 100644 index 00000000..5524cf0e --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/dto/RunJobDto.java @@ -0,0 +1,21 @@ +package com.wugui.dataxweb.dto; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 用于启动任务接收的实体 + * + * @author zhouhongfa@gz-yibo.com + * @ClassName RunJobDto + * @Version 1.0 + * @since 2019/6/27 16:12 + */ +@Data +public class RunJobDto implements Serializable { + + private String jobJson; + + private Long jobConfigId; +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java b/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java new file mode 100644 index 00000000..732ee74e --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/entity/JobLog.java @@ -0,0 +1,90 @@ +package com.wugui.dataxweb.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.baomidou.mybatisplus.annotation.*; +import com.baomidou.mybatisplus.extension.activerecord.Model; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * 抽取日志记录表实体类(job_log) + * + * @author zhouhongfa@gz-yibo.com + * @version v1.0 + * @since 2019-06-27 + */ + +@Data +@ApiModel +@TableName("job_log") +public class JobLog extends Model { + + /** + * + */ + @TableId + @ApiModelProperty(value = "") + private Long id; + + /** + * 抽取任务,主键ID + */ + @ApiModelProperty(value = "抽取任务,主键ID") + private Long jobId; + + /** + * 日志文件路径 + */ + @ApiModelProperty(value = "日志文件路径") + private String logFilePath; + + /** + * + */ + @TableField(fill = FieldFill.INSERT_UPDATE) + @JSONField(format = "yyyy/MM/dd") + @ApiModelProperty(value = "", hidden = true) + private Date updateDate; + + /** + * + */ + @TableLogic + @ApiModelProperty(value = "", hidden = true) + private Integer status; + + /** + * + */ + @ApiModelProperty(value = "", hidden = true) + private Integer createBy; + + /** + * + */ + @TableField(fill = FieldFill.INSERT) + @JSONField(format = "yyyy/MM/dd") + @ApiModelProperty(value = "", hidden = true) + private Date createDate; + + /** + * + */ + @ApiModelProperty(value = "", hidden = true) + private Integer updateBy; + + + /** + * 获取主键值 + * + * @return 主键值 + */ + @Override + protected Serializable pkVal() { + return this.id; + } +} \ No newline at end of file diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java index ac5acd93..e4d61900 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java @@ -1,5 +1,7 @@ package com.wugui.dataxweb.service; +import com.wugui.dataxweb.dto.RunJobDto; + /** * @program: datax-all * @author: huzekang @@ -9,8 +11,11 @@ public interface IDataxJobService { /** * 根据json字符串用线程池启动一个datax作业 * - * @author: huzekang - * @Date: 2019-06-17 + * @author: huzekang + * @Date: 2019-06-17 + * @param jobJson */ String startJobByJsonStr(String jobJson); + + String startJobLog(RunJobDto runJobDto); } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java b/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java new file mode 100644 index 00000000..c5505acf --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/IJobLogService.java @@ -0,0 +1,15 @@ +package com.wugui.dataxweb.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.wugui.dataxweb.entity.JobLog; + +/** + * 抽取日志记录表表服务接口 + * + * @author zhouhongfa@gz-yibo.com + * @version v1.0 + * @since 2019-06-27 + */ +public interface IJobLogService extends IService { + +} \ No newline at end of file diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java index 095d1752..736fd3e7 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java @@ -1,14 +1,23 @@ package com.wugui.dataxweb.service.impl; import cn.hutool.core.io.FileUtil; +import cn.hutool.core.util.StrUtil; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.spi.ErrorCode; import com.alibaba.datax.core.Engine; import com.alibaba.datax.core.util.ExceptionTracker; import com.alibaba.datax.core.util.FrameworkErrorCode; +import com.alibaba.datax.core.util.container.CoreConstant; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.wugui.dataxweb.dto.RunJobDto; +import com.wugui.dataxweb.entity.JobLog; import com.wugui.dataxweb.service.IDataxJobService; +import com.wugui.dataxweb.service.IJobLogService; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.File; @@ -25,11 +34,20 @@ import java.util.concurrent.*; @Slf4j @Service public class IDataxJobServiceImpl implements IDataxJobService { - private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build(); + private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build(); - private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, + private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + /** + * 日志文件保存目录 + */ + @Value("${app.etlLogDir}") + private String etlLogDir; + + @Autowired + private IJobLogService iJobLogService; + @Override public String startJobByJsonStr(String jobJson) { @@ -73,4 +91,23 @@ public class IDataxJobServiceImpl implements IDataxJobService { return "success"; } + + @Override + public String startJobLog(RunJobDto runJobDto) { + //取出 jobJson,并转为json对象 + JSONObject json = JSONObject.parseObject(runJobDto.getJobJson()); + //根据jobId和当前时间戳生成日志文件名 + String logFileName = runJobDto.getJobConfigId().toString().concat("_").concat(StrUtil.toString(System.currentTimeMillis()).concat(".log")); + String logFilePath = etlLogDir.concat(logFileName); + //记录日志 + JobLog jobLog = new JobLog(); + jobLog.setJobId(runJobDto.getJobConfigId()); + jobLog.setLogFilePath(logFilePath); + iJobLogService.save(jobLog); + //将路径放进去 + json.put(CoreConstant.LOG_FILE_PATH, logFilePath); + + //启动任务 + return startJobByJsonStr(JSON.toJSONString(json)); + } } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java new file mode 100644 index 00000000..a74fccbe --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/JobLogServiceImpl.java @@ -0,0 +1,21 @@ +package com.wugui.dataxweb.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.wugui.dataxweb.dao.JobLogMapper; +import com.wugui.dataxweb.entity.JobLog; +import com.wugui.dataxweb.service.IJobLogService; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** + * 抽取日志记录表表服务实现类 + * + * @author zhouhongfa@gz-yibo.com + * @version v1.0 + * @since 2019-06-27 + */ +@Service +@Transactional(readOnly = true) +public class JobLogServiceImpl extends ServiceImpl implements IJobLogService { + +} \ No newline at end of file diff --git a/datax-web/src/main/resources/application.yml b/datax-web/src/main/resources/application.yml index 6062e358..9606592a 100644 --- a/datax-web/src/main/resources/application.yml +++ b/datax-web/src/main/resources/application.yml @@ -34,3 +34,7 @@ mybatis-plus: cache-enabled: false call-setters-on-nulls: true jdbc-type-for-null: 'null' + +app: + # 数据抽取日志文件保存路径 + etlLogDir: /data/applogs/datax-web/ \ No newline at end of file