add: 增加运行任务并保存日志文件接口;

This commit is contained in:
zhouhongfa 2019-06-27 20:47:17 +08:00
parent e3d9bd2d2c
commit ac06d058ba
9 changed files with 225 additions and 5 deletions

View File

@ -1,7 +1,7 @@
package com.wugui.dataxweb.controller;
import com.alibaba.datax.core.Engine;
import com.baomidou.mybatisplus.extension.api.R;
import com.wugui.dataxweb.dto.RunJobDto;
import com.wugui.dataxweb.service.IDataxJobService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@ -136,5 +136,17 @@ public class JobController {
return R.ok(result);
}
/**
* 通过接口传入 runJobDto 实体启动一个datax作业并记录日志
*
* @param runJobDto
* @return
*/
@ApiOperation("通过传入 runJobDto 实体启动一个datax作业并记录日志")
@PostMapping("/runJobLog")
public R<String> runJobLog(@RequestBody RunJobDto runJobDto) {
String result = iDataxJobService.startJobLog(runJobDto);
return R.ok(result);
}
}

View File

@ -0,0 +1,15 @@
package com.wugui.dataxweb.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wugui.dataxweb.entity.JobLog;
/**
* 抽取日志记录表表数据库访问层
*
* @author zhouhongfa@gz-yibo.com
* @version v1.0
* @since 2019-06-27
*/
public interface JobLogMapper extends BaseMapper<JobLog> {
}

View File

@ -0,0 +1,21 @@
package com.wugui.dataxweb.dto;
import lombok.Data;
import java.io.Serializable;
/**
* 用于启动任务接收的实体
*
* @author zhouhongfa@gz-yibo.com
* @ClassName RunJobDto
* @Version 1.0
* @since 2019/6/27 16:12
*/
@Data
public class RunJobDto implements Serializable {
private String jobJson;
private Long jobConfigId;
}

View File

@ -0,0 +1,90 @@
package com.wugui.dataxweb.entity;
import com.alibaba.fastjson.annotation.JSONField;
import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 抽取日志记录表实体类(job_log)
*
* @author zhouhongfa@gz-yibo.com
* @version v1.0
* @since 2019-06-27
*/
@Data
@ApiModel
@TableName("job_log")
public class JobLog extends Model<JobLog> {
/**
*
*/
@TableId
@ApiModelProperty(value = "")
private Long id;
/**
* 抽取任务主键ID
*/
@ApiModelProperty(value = "抽取任务主键ID")
private Long jobId;
/**
* 日志文件路径
*/
@ApiModelProperty(value = "日志文件路径")
private String logFilePath;
/**
*
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
@JSONField(format = "yyyy/MM/dd")
@ApiModelProperty(value = "", hidden = true)
private Date updateDate;
/**
*
*/
@TableLogic
@ApiModelProperty(value = "", hidden = true)
private Integer status;
/**
*
*/
@ApiModelProperty(value = "", hidden = true)
private Integer createBy;
/**
*
*/
@TableField(fill = FieldFill.INSERT)
@JSONField(format = "yyyy/MM/dd")
@ApiModelProperty(value = "", hidden = true)
private Date createDate;
/**
*
*/
@ApiModelProperty(value = "", hidden = true)
private Integer updateBy;
/**
* 获取主键值
*
* @return 主键值
*/
@Override
protected Serializable pkVal() {
return this.id;
}
}

View File

@ -1,5 +1,7 @@
package com.wugui.dataxweb.service;
import com.wugui.dataxweb.dto.RunJobDto;
/**
* @program: datax-all
* @author: huzekang
@ -9,8 +11,11 @@ public interface IDataxJobService {
/**
* 根据json字符串用线程池启动一个datax作业
*
* @author: huzekang
* @Date: 2019-06-17
* @author: huzekang
* @Date: 2019-06-17
* @param jobJson
*/
String startJobByJsonStr(String jobJson);
String startJobLog(RunJobDto runJobDto);
}

View File

@ -0,0 +1,15 @@
package com.wugui.dataxweb.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.wugui.dataxweb.entity.JobLog;
/**
* 抽取日志记录表表服务接口
*
* @author zhouhongfa@gz-yibo.com
* @version v1.0
* @since 2019-06-27
*/
public interface IJobLogService extends IService<JobLog> {
}

View File

@ -1,14 +1,23 @@
package com.wugui.dataxweb.service.impl;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.util.ExceptionTracker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.wugui.dataxweb.dto.RunJobDto;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.service.IDataxJobService;
import com.wugui.dataxweb.service.IJobLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.File;
@ -25,11 +34,20 @@ import java.util.concurrent.*;
@Slf4j
@Service
public class IDataxJobServiceImpl implements IDataxJobService {
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build();
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build();
private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
/**
* 日志文件保存目录
*/
@Value("${app.etlLogDir}")
private String etlLogDir;
@Autowired
private IJobLogService iJobLogService;
@Override
public String startJobByJsonStr(String jobJson) {
@ -73,4 +91,23 @@ public class IDataxJobServiceImpl implements IDataxJobService {
return "success";
}
@Override
public String startJobLog(RunJobDto runJobDto) {
//取出 jobJson并转为json对象
JSONObject json = JSONObject.parseObject(runJobDto.getJobJson());
//根据jobId和当前时间戳生成日志文件名
String logFileName = runJobDto.getJobConfigId().toString().concat("_").concat(StrUtil.toString(System.currentTimeMillis()).concat(".log"));
String logFilePath = etlLogDir.concat(logFileName);
//记录日志
JobLog jobLog = new JobLog();
jobLog.setJobId(runJobDto.getJobConfigId());
jobLog.setLogFilePath(logFilePath);
iJobLogService.save(jobLog);
//将路径放进去
json.put(CoreConstant.LOG_FILE_PATH, logFilePath);
//启动任务
return startJobByJsonStr(JSON.toJSONString(json));
}
}

View File

@ -0,0 +1,21 @@
package com.wugui.dataxweb.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.wugui.dataxweb.dao.JobLogMapper;
import com.wugui.dataxweb.entity.JobLog;
import com.wugui.dataxweb.service.IJobLogService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 抽取日志记录表表服务实现类
*
* @author zhouhongfa@gz-yibo.com
* @version v1.0
* @since 2019-06-27
*/
@Service
@Transactional(readOnly = true)
public class JobLogServiceImpl extends ServiceImpl<JobLogMapper, JobLog> implements IJobLogService {
}

View File

@ -34,3 +34,7 @@ mybatis-plus:
cache-enabled: false
call-setters-on-nulls: true
jdbc-type-for-null: 'null'
app:
# 数据抽取日志文件保存路径
etlLogDir: /data/applogs/datax-web/