Merge pull request #9 from fantasticKe/master

update: 优先通过环境变量获取DataX文件目录
This commit is contained in:
WeiYe 2020-01-08 09:05:36 +08:00 committed by GitHub
commit 441c170fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 20 deletions

View File

@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -1,5 +1,6 @@
package com.wugui.datatx.core.executor.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.wugui.datatx.core.executor.JobExecutor;
import com.wugui.datatx.core.glue.GlueFactory;
import com.wugui.datatx.core.handler.IJobHandler;
@ -42,7 +43,7 @@ public class JobSpringExecutor extends JobExecutor implements ApplicationContext
}
private void initJobHandlerRepository(ApplicationContext applicationContext){
private void initJobHandlerRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
@ -50,13 +51,13 @@ public class JobSpringExecutor extends JobExecutor implements ApplicationContext
// init job handler action
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
if (CollectionUtil.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
if (serviceBean instanceof IJobHandler) {
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {
throw new RuntimeException("datax-web jobhandler["+ name +"] naming conflicts.");
throw new RuntimeException("datax-web jobhandler[" + name + "] naming conflicts.");
}
registJobHandler(name, handler);
}
@ -66,10 +67,12 @@ public class JobSpringExecutor extends JobExecutor implements ApplicationContext
// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

View File

@ -1,6 +1,8 @@
package com.wugui.datax.executor.core.config;
import com.wugui.datatx.core.executor.impl.JobSpringExecutor;
import com.wugui.datax.executor.util.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@ -16,6 +18,8 @@ import org.springframework.context.annotation.Configuration;
public class DataXConfig {
private Logger logger = LoggerFactory.getLogger(DataXConfig.class);
private static final String DEFAULT_LOG_PATH = "log/executor/jobhandler";
@Value("${datax.job.admin.addresses}")
private String adminAddresses;
@ -47,6 +51,8 @@ public class DataXConfig {
jobSpringExecutor.setIp(ip);
jobSpringExecutor.setPort(port);
jobSpringExecutor.setAccessToken(accessToken);
String dataXHomePath = SystemUtils.getDataXHomePath();
if (StringUtils.isNotEmpty(dataXHomePath)) logPath = dataXHomePath + DEFAULT_LOG_PATH;
jobSpringExecutor.setLogPath(logPath);
jobSpringExecutor.setLogRetentionDays(logRetentionDays);

View File

@ -10,6 +10,7 @@ import com.wugui.datatx.core.handler.annotation.JobHandler;
import com.wugui.datatx.core.log.JobLogger;
import com.wugui.datatx.core.thread.ProcessCallbackThread;
import com.wugui.datatx.core.util.ProcessUtil;
import com.wugui.datax.executor.util.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -29,6 +30,10 @@ import java.util.List;
@Component
public class ExecutorJobHandler extends IJobHandler {
private static final String DEFAULT_JSON = "jsons";
private static final String DEFAULT_DATAX_PY = "bin/datax.py";
@Value("${datax.executor.jsonpath}")
private String jsonpath;
@ -51,8 +56,10 @@ public class ExecutorJobHandler extends IJobHandler {
//"--loglevel=debug"
List<String> cmdarray = new ArrayList<>();
cmdarray.add("python");
String dataXHomePath = SystemUtils.getDataXHomePath();
if (StringUtils.isNotEmpty(dataXHomePath)) dataXPyPath = dataXHomePath + DEFAULT_DATAX_PY;
cmdarray.add(dataXPyPath);
if(StringUtils.isNotBlank(doc)){
if (StringUtils.isNotBlank(doc)) {
cmdarray.add(doc.replaceAll(DataxOption.SPLIT_SPACE, DataxOption.TRANSFORM_SPLIT_SPACE));
}
cmdarray.add(tmpFilePath);
@ -114,7 +121,7 @@ public class ExecutorJobHandler extends IJobHandler {
*/
private static void reader(InputStream inputStream) throws IOException {
try {
BufferedReader reader=new BufferedReader(new InputStreamReader(inputStream));
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine()) != null) {
JobLogger.log(line);
@ -144,6 +151,8 @@ public class ExecutorJobHandler extends IJobHandler {
private String generateTemJsonFile(String jobJson) {
String tmpFilePath;
String dataXHomePath = SystemUtils.getDataXHomePath();
if (StringUtils.isNotEmpty(dataXHomePath)) jsonpath = dataXHomePath + DEFAULT_JSON;
if (!FileUtil.exist(jsonpath)) FileUtil.mkdir(jsonpath);
tmpFilePath = jsonpath + "jobTmp-" + IdUtil.simpleUUID() + ".conf";
// 根据json写入到临时本地文件

View File

@ -0,0 +1,40 @@
package com.wugui.datax.executor.util;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author maokeluo
* @description 多隆镇楼bug退散🙏🙏🙏
* 系统工具
* @date 2020/1/7
*/
public class SystemUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(SystemUtils.class);
private static String DATAX_HOME;
private SystemUtils() {
}
/**
* 获取环境变量中的Datax路径
*
* @return
*/
public static String getDataXHomePath() {
if (StringUtils.isNotEmpty(DATAX_HOME)) return DATAX_HOME;
String dataXHome = System.getenv("DATAX_HOME");
if (StringUtils.isBlank(dataXHome)) {
LOGGER.warn("DATAX_HOME 环境变量为NULL");
return null;
}
DATAX_HOME = System.getProperty("os.name").contains("Windows") ?
(!dataXHome.endsWith("\\") ? dataXHome.concat("\\") : dataXHome) :
(!dataXHome.endsWith("/") ? dataXHome.concat("/") : dataXHome);
LOGGER.info("DATAX_HOME:{}", DATAX_HOME);
return DATAX_HOME;
}
}

View File

@ -206,8 +206,8 @@ DROP TABLE IF EXISTS `job_registry`;
CREATE TABLE `job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`registry_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`registry_value` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`registry_key` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`registry_value` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`update_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `i_g_k_v`(`registry_group`, `registry_key`, `registry_value`) USING BTREE