1、datax任务内置变量:模仿阿里云商用DataWorks/ODPS提供内置变量

2、修复replaceVariable方法空指针问题
3、增加注释
This commit is contained in:
Locki 2020-09-18 14:24:54 +08:00
parent 1d074ab8f0
commit de225aac08
2 changed files with 61 additions and 4 deletions

View File

@ -24,6 +24,7 @@ public class BuildCommand {
/**
* DataX command build
*
* @param tgParam
* @param tmpFilePath
* @param dataXPyPath
@ -47,6 +48,14 @@ public class BuildCommand {
return cmdArr.toArray(new String[cmdArr.size()]);
}
/**
* 构建datax运行虚拟机参数
*
* @param tgParam
* @return {@link String}
* @author Locki
* @date 2020/9/18
*/
private static String buildDataXParam(TriggerParam tgParam) {
StringBuilder doc = new StringBuilder();
String jvmParam = StringUtils.isNotBlank(tgParam.getJvmParam()) ? tgParam.getJvmParam().trim() : tgParam.getJvmParam();
@ -56,6 +65,14 @@ public class BuildCommand {
return doc.toString();
}
/**
* 构建datax增量参数
*
* @param tgParam
* @return {@link HashMap< String, String>}
* @author Locki
* @date 2020/9/18
*/
public static HashMap<String, String> buildDataXParamToMap(TriggerParam tgParam) {
String partitionStr = tgParam.getPartitionInfo();
Integer incrementType = tgParam.getIncrementType();
@ -95,6 +112,14 @@ public class BuildCommand {
return null;
}
/**
* 任务参数封装为map
*
* @param formatParam
* @return {@link HashMap< String, String>}
* @author Locki
* @date 2020/9/18
*/
private static HashMap<String, String> getKeyValue(String formatParam) {
String[] paramArr = formatParam.split(PARAMS_SYSTEM);
HashMap<String, String> map = new HashMap<String, String>();
@ -109,6 +134,25 @@ public class BuildCommand {
return map;
}
/**
* datax任务内置变量模仿阿里云商用DataWorks/ODPS提供内置变量<br/>
* ${datax_bizdate}
* ${datax_biztime}
* ${datax_biz_unixtimestamp}
*
* @param
* @return {@link Map< String, String>}
* @author Locki
* @date 2020/9/18
*/
public static Map<String, String> builtInVar(){
Map<String, String> map = new HashMap<String, String>();
map.put("datax_biz_date", DateUtil.format(new Date(), "yyyy-MM-dd"));
map.put("datax_biz_time", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
map.put("datax_biz_unixtimestamp", System.currentTimeMillis() + "");
return map;
}
private void buildPartitionCM(StringBuilder doc, String partitionStr) {
if (StringUtils.isNotBlank(partitionStr)) {
doc.append(SPLIT_SPACE);

View File

@ -23,8 +23,7 @@ import java.util.concurrent.FutureTask;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.wugui.datax.executor.service.command.BuildCommand.buildDataXExecutorCmd;
import static com.wugui.datax.executor.service.command.BuildCommand.buildDataXParamToMap;
import static com.wugui.datax.executor.service.command.BuildCommand.*;
import static com.wugui.datax.executor.service.jobhandler.DataXConstant.DEFAULT_JSON;
import static com.wugui.datax.executor.service.logparse.AnalysisStatistics.analysisStatisticsLog;
@ -55,7 +54,9 @@ public class ExecutorJobHandler extends IJobHandler {
LogStatistics logStatistics = null;
HashMap<String, String> keyValueMap = buildDataXParamToMap(trigger);
String jobJson = replaceVariable(trigger.getJobJson(),keyValueMap);
String jobJson = replaceVariable(trigger.getJobJson(), keyValueMap);
Map<String, String> buildin = builtInVar();
jobJson = replaceVariable(jobJson, buildin);
//Generate JSON temporary file
tmpFilePath = generateTemJsonFile(jobJson);
@ -107,7 +108,19 @@ public class ExecutorJobHandler extends IJobHandler {
}
}
public static String replaceVariable(final String param,HashMap<String, String> variableMap) {
/**
* 替换json变量
*
* @param param
* @param variableMap
* @return {@link String}
* @author Locki
* @date 2020/9/18
*/
public static String replaceVariable(final String param, Map<String, String> variableMap) {
if (variableMap == null || variableMap.size() < 1) {
return param;
}
Map<String, String> mapping = new HashMap<String, String>();
Matcher matcher = VARIABLE_PATTERN.matcher(param);