diff --git a/datax-all.iml b/datax-all.iml deleted file mode 100644 index f409c0ea..00000000 --- a/datax-all.iml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/datax-core/src/main/java/com/wugui/datatx/core/executor/impl/JobSpringExecutor.java b/datax-core/src/main/java/com/wugui/datatx/core/executor/impl/JobSpringExecutor.java index 05ecaa3d..acd8ba8c 100644 --- a/datax-core/src/main/java/com/wugui/datatx/core/executor/impl/JobSpringExecutor.java +++ b/datax-core/src/main/java/com/wugui/datatx/core/executor/impl/JobSpringExecutor.java @@ -1,5 +1,6 @@ package com.wugui.datatx.core.executor.impl; +import cn.hutool.core.collection.CollectionUtil; import com.wugui.datatx.core.executor.JobExecutor; import com.wugui.datatx.core.glue.GlueFactory; import com.wugui.datatx.core.handler.IJobHandler; @@ -42,7 +43,7 @@ public class JobSpringExecutor extends JobExecutor implements ApplicationContext } - private void initJobHandlerRepository(ApplicationContext applicationContext){ + private void initJobHandlerRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } @@ -50,13 +51,13 @@ public class JobSpringExecutor extends JobExecutor implements ApplicationContext // init job handler action Map serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); - if (serviceBeanMap!=null && serviceBeanMap.size()>0) { + if (CollectionUtil.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { - if (serviceBean instanceof IJobHandler){ + if (serviceBean instanceof IJobHandler) { String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); IJobHandler handler = (IJobHandler) serviceBean; if (loadJobHandler(name) != null) { - throw new RuntimeException("datax-web jobhandler["+ name +"] naming conflicts."); + throw new RuntimeException("datax-web jobhandler[" + name + "] naming conflicts."); } registJobHandler(name, handler); } @@ -66,10 +67,12 @@ public class JobSpringExecutor extends JobExecutor implements ApplicationContext // ---------------------- applicationContext ---------------------- private static ApplicationContext applicationContext; + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } + public static ApplicationContext getApplicationContext() { return applicationContext; } diff --git a/datax-executor/src/main/java/com/wugui/datax/executor/core/config/DataXConfig.java b/datax-executor/src/main/java/com/wugui/datax/executor/core/config/DataXConfig.java index 50bd2734..2fd1c0c6 100644 --- a/datax-executor/src/main/java/com/wugui/datax/executor/core/config/DataXConfig.java +++ b/datax-executor/src/main/java/com/wugui/datax/executor/core/config/DataXConfig.java @@ -1,6 +1,8 @@ package com.wugui.datax.executor.core.config; import com.wugui.datatx.core.executor.impl.JobSpringExecutor; +import com.wugui.datax.executor.util.SystemUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -16,6 +18,8 @@ import org.springframework.context.annotation.Configuration; public class DataXConfig { private Logger logger = LoggerFactory.getLogger(DataXConfig.class); + private static final String DEFAULT_LOG_PATH = "log/executor/jobhandler"; + @Value("${datax.job.admin.addresses}") private String adminAddresses; @@ -47,6 +51,8 @@ public class DataXConfig { jobSpringExecutor.setIp(ip); jobSpringExecutor.setPort(port); jobSpringExecutor.setAccessToken(accessToken); + String dataXHomePath = SystemUtils.getDataXHomePath(); + if (StringUtils.isNotEmpty(dataXHomePath)) logPath = dataXHomePath + DEFAULT_LOG_PATH; jobSpringExecutor.setLogPath(logPath); jobSpringExecutor.setLogRetentionDays(logRetentionDays); diff --git a/datax-executor/src/main/java/com/wugui/datax/executor/service/jobhandler/ExecutorJobHandler.java b/datax-executor/src/main/java/com/wugui/datax/executor/service/jobhandler/ExecutorJobHandler.java index 60724c32..78b837bd 100644 --- a/datax-executor/src/main/java/com/wugui/datax/executor/service/jobhandler/ExecutorJobHandler.java +++ b/datax-executor/src/main/java/com/wugui/datax/executor/service/jobhandler/ExecutorJobHandler.java @@ -10,6 +10,7 @@ import com.wugui.datatx.core.handler.annotation.JobHandler; import com.wugui.datatx.core.log.JobLogger; import com.wugui.datatx.core.thread.ProcessCallbackThread; import com.wugui.datatx.core.util.ProcessUtil; +import com.wugui.datax.executor.util.SystemUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -29,6 +30,10 @@ import java.util.List; @Component public class ExecutorJobHandler extends IJobHandler { + private static final String DEFAULT_JSON = "jsons"; + + private static final String DEFAULT_DATAX_PY = "bin/datax.py"; + @Value("${datax.executor.jsonpath}") private String jsonpath; @@ -51,8 +56,10 @@ public class ExecutorJobHandler extends IJobHandler { //"--loglevel=debug" List cmdarray = new ArrayList<>(); cmdarray.add("python"); + String dataXHomePath = SystemUtils.getDataXHomePath(); + if (StringUtils.isNotEmpty(dataXHomePath)) dataXPyPath = dataXHomePath + DEFAULT_DATAX_PY; cmdarray.add(dataXPyPath); - if(StringUtils.isNotBlank(doc)){ + if (StringUtils.isNotBlank(doc)) { cmdarray.add(doc.replaceAll(DataxOption.SPLIT_SPACE, DataxOption.TRANSFORM_SPLIT_SPACE)); } cmdarray.add(tmpFilePath); @@ -114,7 +121,7 @@ public class ExecutorJobHandler extends IJobHandler { */ private static void reader(InputStream inputStream) throws IOException { try { - BufferedReader reader=new BufferedReader(new InputStreamReader(inputStream)); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String line; while ((line = reader.readLine()) != null) { JobLogger.log(line); @@ -144,6 +151,8 @@ public class ExecutorJobHandler extends IJobHandler { private String generateTemJsonFile(String jobJson) { String tmpFilePath; + String dataXHomePath = SystemUtils.getDataXHomePath(); + if (StringUtils.isNotEmpty(dataXHomePath)) jsonpath = dataXHomePath + DEFAULT_JSON; if (!FileUtil.exist(jsonpath)) FileUtil.mkdir(jsonpath); tmpFilePath = jsonpath + "jobTmp-" + IdUtil.simpleUUID() + ".conf"; // 根据json写入到临时本地文件 diff --git a/datax-executor/src/main/java/com/wugui/datax/executor/util/SystemUtils.java b/datax-executor/src/main/java/com/wugui/datax/executor/util/SystemUtils.java new file mode 100644 index 00000000..dd6d7fe9 --- /dev/null +++ b/datax-executor/src/main/java/com/wugui/datax/executor/util/SystemUtils.java @@ -0,0 +1,40 @@ +package com.wugui.datax.executor.util; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author maokeluo + * @description 多隆镇楼,bug退散🙏🙏🙏 + * 系统工具 + * @date 2020/1/7 + */ +public class SystemUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(SystemUtils.class); + + private static String DATAX_HOME; + + private SystemUtils() { + } + + /** + * 获取环境变量中的Datax路径 + * + * @return + */ + public static String getDataXHomePath() { + if (StringUtils.isNotEmpty(DATAX_HOME)) return DATAX_HOME; + String dataXHome = System.getenv("DATAX_HOME"); + if (StringUtils.isBlank(dataXHome)) { + LOGGER.warn("DATAX_HOME 环境变量为NULL"); + return null; + } + DATAX_HOME = System.getProperty("os.name").contains("Windows") ? + (!dataXHome.endsWith("\\") ? dataXHome.concat("\\") : dataXHome) : + (!dataXHome.endsWith("/") ? dataXHome.concat("/") : dataXHome); + LOGGER.info("DATAX_HOME:{}", DATAX_HOME); + return DATAX_HOME; + } +} diff --git a/doc/db/datax_web.sql b/doc/db/datax_web.sql index 63e39cb7..b46ad358 100644 --- a/doc/db/datax_web.sql +++ b/doc/db/datax_web.sql @@ -206,8 +206,8 @@ DROP TABLE IF EXISTS `job_registry`; CREATE TABLE `job_registry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `registry_group` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `registry_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `registry_value` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `registry_key` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `registry_value` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `update_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE, INDEX `i_g_k_v`(`registry_group`, `registry_key`, `registry_value`) USING BTREE