diff --git a/README.md b/README.md index 1c312ebe..100c7155 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ jdk1.8 * [x] springboot重构项目 * [x] 通过restful接口调度datax完成抽取数据作业 -* [ ] 通过restful接口传入job配置json生成临时文件,根据文件配置调度datax执行该作业 +* [x] 通过restful接口传入job配置json生成临时文件,根据文件配置调度datax执行该作业 * [ ] 实现datax分布式作业 * [ ] 网页端修改job配置的json * [ ] 网页端实时查看抽取日志 diff --git a/core/pom.xml b/core/pom.xml index 5582d943..abab6fac 100755 --- a/core/pom.xml +++ b/core/pom.xml @@ -23,6 +23,11 @@ + + cn.hutool + hutool-all + 4.5.1 + commons-configuration commons-configuration diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index ae420ef0..64aa4874 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -1,5 +1,6 @@ package com.alibaba.datax.core; +import cn.hutool.core.io.FileUtil; import com.alibaba.datax.common.element.ColumnCast; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.spi.ErrorCode; @@ -14,14 +15,14 @@ import com.alibaba.datax.core.util.ExceptionTracker; import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.datax.core.util.container.LoadUtil; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -231,7 +232,7 @@ public class Engine { * @Date: 2019-05-05 */ - public static void startJob(String jobPath) { + public static void testStartJob(String jobPath) { try { Engine.entry(jobPath); @@ -250,4 +251,44 @@ public class Engine { } } + //todo 都是用同一个文件,是否需要考虑线程安全问题 + public static void startJobByJsonStr(String jobJson) { + String tmpFilePath = "jobTmp.conf"; + // 根据json写入到临时本地文件 + PrintWriter writer = null; + try { + writer = new PrintWriter(tmpFilePath, "UTF-8"); + writer.println(jobJson); + + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } finally { + if (writer != null) { + writer.close(); + } + } + + // 使用临时本地文件启动datax作业 + try { + Engine.entry(tmpFilePath); + } catch (Throwable e) { + LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e)); + + if (e instanceof DataXException) { + DataXException tempException = (DataXException) e; + ErrorCode errorCode = tempException.getErrorCode(); + if (errorCode instanceof FrameworkErrorCode) { + FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode; + } + } + + } + + //删除临时文件 + FileUtil.del(new File("jobTmp.conf")); + } + + } diff --git a/core/src/test/java/Test.java b/core/src/test/java/Test.java deleted file mode 100644 index 690f9667..00000000 --- a/core/src/test/java/Test.java +++ /dev/null @@ -1,15 +0,0 @@ -import com.alibaba.datax.core.Engine; - -/** - * @program: datax-all - * @description: - * @author: huzekang - * @create: 2019-04-17 11:37 - **/ -public class Test { - - @org.junit.Test - public void test() { -// Engine.startJob(); - } -} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobContentController.java b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobContentController.java index 583ecb2b..2fa69379 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobContentController.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobContentController.java @@ -2,6 +2,8 @@ package com.wugui.dataxweb.controller; import com.alibaba.datax.core.Engine; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; /** @@ -13,11 +15,11 @@ import org.springframework.web.bind.annotation.RestController; @RestController public class JobContentController { - @GetMapping("/startJob") - public void startJob() { + @GetMapping("/testStartJob") + public void testStartJob() { // 指定获取作业配置json的接口,此处用下面mock出来的接口提供 String jobPath = "http://localhost:8080/mock_stream2stream"; - Engine.startJob(jobPath); + Engine.testStartJob(jobPath); } @GetMapping("/mock_oracle2mongodb") @@ -114,5 +116,16 @@ public class JobContentController { "}"; } + /** + * 通过接口传入json配置启动一个datax作业 + * @param jobJson + * @return + */ + @PostMapping("/runJob") + public String runJob(@RequestBody String jobJson) { + Engine.startJobByJsonStr(jobJson); + return "success"; + } + }