diff --git a/README.md b/README.md index 7aba96fc..d844e2e8 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,8 @@ * [ ] 实现部分写插件支持自动建表功能 +## 前端项目 +https://github.com/zhouhongfa/datax-vue-admin.git ## how to run ### 1. 在父工程目录下使用maven打包 ``` @@ -47,3 +49,7 @@ curl http://localhost:8080/startJob ``` 可以看到成功跑完一个datax作业 ![](https://raw.githubusercontent.com/peter1040080742/picbed/master/20190505162333.png) + +### 5. 打开网页端启动作业 +http://localhost:8080/index.html#/datax/job +![](https://raw.githubusercontent.com/huzekang/picbed/master/20190617120207.png) \ No newline at end of file diff --git a/core/pom.xml b/core/pom.xml index bc0d1c9b..8b1bbfd7 100755 --- a/core/pom.xml +++ b/core/pom.xml @@ -155,6 +155,14 @@ ${project-sourceEncoding} + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index 9ffed620..509f7db4 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -1,6 +1,5 @@ package com.alibaba.datax.core; -import cn.hutool.core.io.FileUtil; import com.alibaba.datax.common.element.ColumnCast; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.spi.ErrorCode; @@ -19,10 +18,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.PrintWriter; -import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -33,6 +28,8 @@ import java.util.regex.Pattern; * Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑 */ public class Engine { + + private static final Logger LOG = LoggerFactory.getLogger(Engine.class); private static String RUNTIME_MODE; @@ -225,70 +222,9 @@ public class Engine { System.exit(exitCode); } - /** - * 测试使用springboot启动作业job - * - * @author: huzekang - * @Date: 2019-05-05 - */ - public static void testStartJob(String jobPath) { - try { - Engine.entry(jobPath); - } catch (Throwable e) { - LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e)); - - if (e instanceof DataXException) { - DataXException tempException = (DataXException) e; - ErrorCode errorCode = tempException.getErrorCode(); - if (errorCode instanceof FrameworkErrorCode) { - FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode; - } - } - - } - } - - // todo 都是用同一个文件,是否需要考虑线程安全问题 - // 需要做成异步的,否则前端会一直loading等待完成作业 - public static void startJobByJsonStr(String jobJson) { - final String tmpFilePath = "jobTmp-"+System.currentTimeMillis()+".conf"; - // 根据json写入到临时本地文件 - PrintWriter writer = null; - try { - writer = new PrintWriter(tmpFilePath, "UTF-8"); - writer.println(jobJson); - - } catch (FileNotFoundException e) { - e.printStackTrace(); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } finally { - if (writer != null) { - writer.close(); - } - } - - // 使用临时本地文件启动datax作业 - try { - Engine.entry(tmpFilePath); - } catch (Throwable e) { - LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e)); - - if (e instanceof DataXException) { - DataXException tempException = (DataXException) e; - ErrorCode errorCode = tempException.getErrorCode(); - if (errorCode instanceof FrameworkErrorCode) { - FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode; - } - } - - } - //删除临时文件 - FileUtil.del(new File(tmpFilePath)); - } } diff --git a/datax-web/pom.xml b/datax-web/pom.xml index f5a25b01..5af9a88a 100644 --- a/datax-web/pom.xml +++ b/datax-web/pom.xml @@ -44,6 +44,11 @@ mybatis-plus ${mybatisplus.version} + + com.google.guava + guava + 27.0.1-jre + io.springfox diff --git a/datax-web/src/main/java/com/wugui/dataxweb/config/WebConfig.java b/datax-web/src/main/java/com/wugui/dataxweb/config/WebConfig.java new file mode 100644 index 00000000..f19bfec8 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/config/WebConfig.java @@ -0,0 +1,24 @@ +package com.wugui.dataxweb.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +/** + * 前端静态资源访问 + * + * @program: datax-all + * @author: huzekang + * @create: 2019-06-17 10:40 + **/ +@Configuration + +public class WebConfig implements WebMvcConfigurer { + + + @Override + public void addResourceHandlers(ResourceHandlerRegistry registry) { + registry.addResourceHandler("/index.html").addResourceLocations("classpath:/static/index.html"); + registry.addResourceHandler("/static/**").addResourceLocations("classpath:/static/static/"); + } +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java index 5dc6a7ce..fe28015a 100644 --- a/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java +++ b/datax-web/src/main/java/com/wugui/dataxweb/controller/JobController.java @@ -1,8 +1,11 @@ package com.wugui.dataxweb.controller; import com.alibaba.datax.core.Engine; +import com.baomidou.mybatisplus.extension.api.R; +import com.wugui.dataxweb.service.IDataxJobService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** @@ -16,11 +19,14 @@ import org.springframework.web.bind.annotation.*; @Api(tags = "datax作业接口") public class JobController { + @Autowired + IDataxJobService iDataxJobService; + @GetMapping("/testStartJob") public void testStartJob() { // 指定获取作业配置json的接口,此处用下面mock出来的接口提供 String jobPath = "http://localhost:8080/mock_stream2stream"; - Engine.testStartJob(jobPath); + iDataxJobService.startJobByJsonStr(jobPath); } @GetMapping("/mock_oracle2mongodb") @@ -125,9 +131,9 @@ public class JobController { */ @ApiOperation("通过传入json配置启动一个datax作业") @PostMapping("/runJob") - public String runJob(@RequestBody String jobJson) { - Engine.startJobByJsonStr(jobJson); - return "success"; + public R runJob(@RequestBody String jobJson) { + String result = iDataxJobService.startJobByJsonStr(jobJson); + return R.ok(result); } diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java new file mode 100644 index 00000000..ac5acd93 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/IDataxJobService.java @@ -0,0 +1,16 @@ +package com.wugui.dataxweb.service; + +/** + * @program: datax-all + * @author: huzekang + * @create: 2019-06-17 11:25 + **/ +public interface IDataxJobService { + /** + * 根据json字符串用线程池启动一个datax作业 + * + * @author: huzekang + * @Date: 2019-06-17 + */ + String startJobByJsonStr(String jobJson); +} diff --git a/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java new file mode 100644 index 00000000..095d1752 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/dataxweb/service/impl/IDataxJobServiceImpl.java @@ -0,0 +1,76 @@ +package com.wugui.dataxweb.service.impl; + +import cn.hutool.core.io.FileUtil; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.spi.ErrorCode; +import com.alibaba.datax.core.Engine; +import com.alibaba.datax.core.util.ExceptionTracker; +import com.alibaba.datax.core.util.FrameworkErrorCode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.wugui.dataxweb.service.IDataxJobService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.util.concurrent.*; + +/** + * @program: datax-all + * @author: huzekang + * @create: 2019-06-17 11:26 + **/ +@Slf4j +@Service +public class IDataxJobServiceImpl implements IDataxJobService { + private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("datax-job-%d").build(); + + private ExecutorService jobPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + + + @Override + public String startJobByJsonStr(String jobJson) { + + jobPool.submit(() -> { + + final String tmpFilePath = "jobTmp-" + System.currentTimeMillis() + ".conf"; + // 根据json写入到临时本地文件 + PrintWriter writer = null; + try { + writer = new PrintWriter(tmpFilePath, "UTF-8"); + writer.println(jobJson); + + } catch (FileNotFoundException | UnsupportedEncodingException e) { + e.printStackTrace(); + } finally { + if (writer != null) { + writer.close(); + } + } + + try { + // 使用临时本地文件启动datax作业 + Engine.entry(tmpFilePath); + // 删除临时文件 + FileUtil.del(new File(tmpFilePath)); + } catch (Throwable e) { + log.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e)); + + if (e instanceof DataXException) { + DataXException tempException = (DataXException) e; + ErrorCode errorCode = tempException.getErrorCode(); + if (errorCode instanceof FrameworkErrorCode) { + FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode; + } + } + + } + }); + + return "success"; + } + +} diff --git a/datax-web/src/main/resources/static/favicon.ico b/datax-web/src/main/resources/static/favicon.ico new file mode 100644 index 00000000..34b63ac6 Binary files /dev/null and b/datax-web/src/main/resources/static/favicon.ico differ diff --git a/datax-web/src/main/resources/static/index.html b/datax-web/src/main/resources/static/index.html new file mode 100644 index 00000000..ffa490f3 --- /dev/null +++ b/datax-web/src/main/resources/static/index.html @@ -0,0 +1 @@ +datax管理
\ No newline at end of file