1. 新增通过接口跑一个datax作业
This commit is contained in:
huzekang 2019-05-07 19:15:38 +08:00
parent 15d43a1bbf
commit b4630130cf
5 changed files with 68 additions and 24 deletions

View File

@ -9,7 +9,7 @@ jdk1.8
* [x] springboot重构项目
* [x] 通过restful接口调度datax完成抽取数据作业
* [ ] 通过restful接口传入job配置json生成临时文件根据文件配置调度datax执行该作业
* [x] 通过restful接口传入job配置json生成临时文件根据文件配置调度datax执行该作业
* [ ] 实现datax分布式作业
* [ ] 网页端修改job配置的json
* [ ] 网页端实时查看抽取日志

View File

@ -23,6 +23,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.core;
import cn.hutool.core.io.FileUtil;
import com.alibaba.datax.common.element.ColumnCast;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
@ -14,14 +15,14 @@ import com.alibaba.datax.core.util.ExceptionTracker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@ -231,7 +232,7 @@ public class Engine {
* @Date: 2019-05-05
*/
public static void startJob(String jobPath) {
public static void testStartJob(String jobPath) {
try {
Engine.entry(jobPath);
@ -250,4 +251,44 @@ public class Engine {
}
}
//todo 都是用同一个文件是否需要考虑线程安全问题
public static void startJobByJsonStr(String jobJson) {
String tmpFilePath = "jobTmp.conf";
// 根据json写入到临时本地文件
PrintWriter writer = null;
try {
writer = new PrintWriter(tmpFilePath, "UTF-8");
writer.println(jobJson);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} finally {
if (writer != null) {
writer.close();
}
}
// 使用临时本地文件启动datax作业
try {
Engine.entry(tmpFilePath);
} catch (Throwable e) {
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));
if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
}
}
}
//删除临时文件
FileUtil.del(new File("jobTmp.conf"));
}
}

View File

@ -1,15 +0,0 @@
import com.alibaba.datax.core.Engine;
/**
* @program: datax-all
* @description:
* @author: huzekang
* @create: 2019-04-17 11:37
**/
public class Test {
@org.junit.Test
public void test() {
// Engine.startJob();
}
}

View File

@ -2,6 +2,8 @@ package com.wugui.dataxweb.controller;
import com.alibaba.datax.core.Engine;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
@ -13,11 +15,11 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class JobContentController {
@GetMapping("/startJob")
public void startJob() {
@GetMapping("/testStartJob")
public void testStartJob() {
// 指定获取作业配置json的接口此处用下面mock出来的接口提供
String jobPath = "http://localhost:8080/mock_stream2stream";
Engine.startJob(jobPath);
Engine.testStartJob(jobPath);
}
@GetMapping("/mock_oracle2mongodb")
@ -114,5 +116,16 @@ public class JobContentController {
"}";
}
/**
* 通过接口传入json配置启动一个datax作业
* @param jobJson
* @return
*/
@PostMapping("/runJob")
public String runJob(@RequestBody String jobJson) {
Engine.startJobByJsonStr(jobJson);
return "success";
}
}