update: 在原来的datax Logger 下增加保存到日志文件中,后面会增加日志可视化页面;

This commit is contained in:
zhouhongfa 2019-06-27 20:50:23 +08:00
parent ac06d058ba
commit e74ae53dec
30 changed files with 274 additions and 81 deletions

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.common.statistics;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.HostUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
@ -97,6 +98,7 @@ public class PerfRecord implements Comparable<PerfRecord> {
//在PerfTrace里注册
PerfTrace.getInstance().tracePerfRecord(perfRecord);
perf.info(perfRecord.toString());
EtlJobLogger.log(perfRecord.toString());
}
}

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.common.statistics;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.statistics.PerfRecord.PHASE;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.HostUtils;
@ -74,6 +75,7 @@ public class PerfTrace {
public static PerfTrace getInstance() {
if (instance == null) {
LOG.error("PerfTrace instance not be init! must have some error! ");
EtlJobLogger.log("PerfTrace instance not be init! must have some error! ");
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(false, -1111, -1111, 0, false);
@ -92,6 +94,7 @@ public class PerfTrace {
this.instId = jobId;
this.priority = priority;
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
EtlJobLogger.log(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
} catch (Exception e) {
// do nothing

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.common.statistics;
import com.alibaba.datax.common.log.EtlJobLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +34,8 @@ public class VMInfo {
vmInfo = new VMInfo();
} catch (Exception e) {
LOG.warn("no need care, the fail is ignored : vmInfo init failed " + e.getMessage(), e);
EtlJobLogger.log(e);
}
}
}
@ -84,6 +87,7 @@ public class VMInfo {
//构建startPhyOSStatus
startPhyOSStatus = new PhyOSStatus();
LOG.info("VMInfo# operatingSystem class => " + osMXBean.getClass().getName());
EtlJobLogger.log("VMInfo# operatingSystem class => " + osMXBean.getClass().getName());
if (VMInfo.isSunOsMBean(osMXBean)) {
{
startPhyOSStatus.totalPhysicalMemory = VMInfo.getLongFromOperatingSystem(osMXBean, "getTotalPhysicalMemorySize");
@ -181,10 +185,12 @@ public class VMInfo {
if (print) {
LOG.info(processCpuStatus.getDeltaString() + processMomoryStatus.getDeltaString() + processGCStatus.getDeltaString());
EtlJobLogger.log(processCpuStatus.getDeltaString() + processMomoryStatus.getDeltaString() + processGCStatus.getDeltaString());
}
} catch (Exception e) {
LOG.warn("no need care, the fail is ignored : vmInfo getDelta failed " + e.getMessage(), e);
EtlJobLogger.log(e);
}
}
@ -201,6 +207,7 @@ public class VMInfo {
return (Long) method.invoke(operatingSystem, (Object[]) null);
} catch (final Exception e) {
LOG.info(String.format("OperatingSystemMXBean %s failed, Exception = %s ", methodName, e.getMessage()));
EtlJobLogger.log(String.format("OperatingSystemMXBean %s failed, Exception = %s ", methodName, e.getMessage()));
}
return -1;

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.common.util;
import com.alibaba.datax.common.log.EtlJobLogger;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,10 +41,12 @@ public class HostUtils {
}
} catch (Exception e) {
log.warn("get hostname failed {}", e.getMessage());
EtlJobLogger.log(e);
}
}
IP = ip;
HOSTNAME = hostname;
log.info("IP {} HOSTNAME {}", IP, HOSTNAME);
EtlJobLogger.log("IP {} HOSTNAME {}", IP, HOSTNAME);
}
}

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.common.util;
import com.alibaba.datax.common.log.EtlJobLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -113,6 +114,8 @@ public final class RetryUtil {
saveException = e;
if (i == 0) {
LOG.error(String.format("Exception when calling callable, 异常Msg:%s", saveException.getMessage()), saveException);
EtlJobLogger.log(String.format("Exception when calling callable, 异常Msg:%s", saveException.getMessage()));
EtlJobLogger.log(saveException);
}
if (null != retryExceptionClasss && !retryExceptionClasss.isEmpty()) {
@ -153,6 +156,8 @@ public final class RetryUtil {
LOG.error(String.format("Exception when calling callable, 即将尝试执行第%s次重试.本次重试计划等待[%s]ms,实际等待[%s]ms, 异常Msg:[%s]",
i+1, timeToSleep,realTimeSleep, e.getMessage()));
EtlJobLogger.log(String.format("Exception when calling callable, 即将尝试执行第%s次重试.本次重试计划等待[%s]ms,实际等待[%s]ms, 异常Msg:[%s]",
i + 1, timeToSleep, realTimeSleep, e.getMessage()));
}
}
@ -195,11 +200,13 @@ public final class RetryUtil {
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Try once failed", e);
EtlJobLogger.log(e);
throw e;
} finally {
if (!future.isDone()) {
future.cancel(true);
LOG.warn("Try once task not done, cancel it, active count: " + executor.getActiveCount());
EtlJobLogger.log("Try once task not done, cancel it, active count: " + executor.getActiveCount());
}
}
}

View File

@ -23,11 +23,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>

View File

@ -2,6 +2,8 @@ package com.alibaba.datax.core;
import com.alibaba.datax.common.element.ColumnCast;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobFileAppender;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.statistics.VMInfo;
@ -48,7 +50,7 @@ public class Engine {
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber =0;
int channelNumber = 0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
@ -73,21 +75,21 @@ public class Engine {
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standlone模式的datax shell任务不进行汇报
if(instanceId == -1){
if (instanceId == -1) {
perfReportEnable = false;
}
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
} catch (NumberFormatException e) {
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: " + System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);
container.start();
}
@ -101,12 +103,12 @@ public class Engine {
filterSensitiveConfiguration(jobContent);
jobConfWithSetting.set("content",jobContent);
jobConfWithSetting.set("content", jobContent);
return jobConfWithSetting.beautify();
}
public static Configuration filterSensitiveConfiguration(Configuration configuration){
public static Configuration filterSensitiveConfiguration(Configuration configuration) {
Set<String> keys = configuration.getKeys();
for (final String key : keys) {
boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password")
@ -134,6 +136,11 @@ public class Engine {
RUNTIME_MODE = "standalone";
Configuration configuration = ConfigParser.parse(jobPath);
//取出日志文件路径
String logFilePath = configuration.getString(CoreConstant.LOG_FILE_PATH);
LOG.info("设置日志文件路径:{}", logFilePath);
//设置日志文件路径
EtlJobFileAppender.contextHolder.set(logFilePath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
@ -159,12 +166,13 @@ public class Engine {
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
EtlJobLogger.log(vmInfo.toString());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
EtlJobLogger.log("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
EtlJobLogger.log(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
@ -173,8 +181,8 @@ public class Engine {
/**
* -1 表示未能解析到 jobId
*
* only for dsc & ds & datax 3 update
* <p>
* only for dsc & ds & datax 3 update
*/
private static long parseJobIdFromUrl(List<String> patternStringList, String url) {
long result = -1;
@ -207,7 +215,7 @@ public class Engine {
} catch (Throwable e) {
exitCode = 1;
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));
EtlJobLogger.log("\n\n经DataX智能分析,该任务最可能的错误原因是: {}\n", ExceptionTracker.trace(e));
if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
@ -223,8 +231,4 @@ public class Engine {
}
}

View File

@ -6,6 +6,7 @@ package com.alibaba.datax.core.container.util;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.spi.Hook;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.util.FrameworkErrorCode;
@ -42,6 +43,7 @@ public class HookInvoker {
public void invokeAll() {
if (!baseDir.exists() || baseDir.isFile()) {
LOG.info("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath());
EtlJobLogger.log("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath());
return;
}
@ -73,10 +75,12 @@ public class HookInvoker {
} else {
Hook hook = hookIt.next();
LOG.info("Invoke hook [{}], path: {}", hook.getName(), path);
EtlJobLogger.log("Invoke hook [{}], path: {}", hook.getName(), path);
hook.invoke(conf, msg);
}
} catch (Exception e) {
LOG.error("Exception when invoke hook", e);
EtlJobLogger.log(e);
throw DataXException.asDataXException(
CommonErrorCode.HOOK_INTERNAL_ERROR, "Exception when invoke hook", e);
} finally {

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.core.job;
import com.alibaba.datax.common.constant.PluginType;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.AbstractJobPlugin;
import com.alibaba.datax.common.plugin.JobPluginCollector;
import com.alibaba.datax.common.spi.Reader;
@ -95,14 +96,15 @@ public class JobContainer extends AbstractContainer {
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");
EtlJobLogger.log("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun) {
if (isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
EtlJobLogger.log("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
@ -112,10 +114,13 @@ public class JobContainer extends AbstractContainer {
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
EtlJobLogger.log("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
EtlJobLogger.log("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
EtlJobLogger.log("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
@ -128,7 +133,7 @@ public class JobContainer extends AbstractContainer {
}
} catch (Throwable e) {
LOG.error("Exception when job run", e);
EtlJobLogger.log(e);
hasException = true;
if (e instanceof OutOfMemoryError) {
@ -162,7 +167,7 @@ public class JobContainer extends AbstractContainer {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if(!isDryRun) {
if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
@ -172,9 +177,11 @@ public class JobContainer extends AbstractContainer {
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
EtlJobLogger.log(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
EtlJobLogger.log(PerfTrace.getInstance().summarizeNoException());
this.logStatistics();
}
}
@ -191,6 +198,7 @@ public class JobContainer extends AbstractContainer {
this.preCheckReader();
this.preCheckWriter();
LOG.info("PreCheck通过");
EtlJobLogger.log("PreCheck通过");
}
private void preCheckInit() {
@ -199,6 +207,7 @@ public class JobContainer extends AbstractContainer {
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
EtlJobLogger.log("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
@ -268,6 +277,8 @@ public class JobContainer extends AbstractContainer {
PluginType.READER, this.readerPluginName));
LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .",
this.readerPluginName));
EtlJobLogger.log(String.format("DataX Reader.Job [%s] do preCheck work .",
this.readerPluginName));
this.jobReader.preCheck();
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
@ -277,6 +288,8 @@ public class JobContainer extends AbstractContainer {
PluginType.WRITER, this.writerPluginName));
LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .",
this.writerPluginName));
EtlJobLogger.log(String.format("DataX Writer.Job [%s] do preCheck work .",
this.writerPluginName));
this.jobWriter.preCheck();
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
@ -290,6 +303,7 @@ public class JobContainer extends AbstractContainer {
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
EtlJobLogger.log("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
@ -312,7 +326,7 @@ public class JobContainer extends AbstractContainer {
private void preHandle() {
String handlerPluginTypeStr = this.configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINTYPE);
if(!StringUtils.isNotEmpty(handlerPluginTypeStr)){
if (!StringUtils.isNotEmpty(handlerPluginTypeStr)) {
return;
}
PluginType handlerPluginType;
@ -342,13 +356,14 @@ public class JobContainer extends AbstractContainer {
classLoaderSwapper.restoreCurrentThreadClassLoader();
LOG.info("After PreHandler: \n" + Engine.filterJobConfiguration(configuration) + "\n");
EtlJobLogger.log("After PreHandler: \n" + Engine.filterJobConfiguration(configuration) + "\n");
}
private void postHandle() {
String handlerPluginTypeStr = this.configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINTYPE);
if(!StringUtils.isNotEmpty(handlerPluginTypeStr)){
if (!StringUtils.isNotEmpty(handlerPluginTypeStr)) {
return;
}
PluginType handlerPluginType;
@ -398,7 +413,7 @@ public class JobContainer extends AbstractContainer {
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
LOG.debug("transformer configuration: " + JSON.toJSONString(transformerList));
/**
* 输入是reader和writer的parameter list输出是content下面元素的list
*/
@ -406,7 +421,8 @@ public class JobContainer extends AbstractContainer {
readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentConfig));
EtlJobLogger.log("contentConfig configuration: " + JSON.toJSONString(contentConfig));
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
@ -437,6 +453,7 @@ public class JobContainer extends AbstractContainer {
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
EtlJobLogger.log("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
}
boolean isRecordLimit = (this.configuration.getInt(
@ -457,6 +474,7 @@ public class JobContainer extends AbstractContainer {
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
EtlJobLogger.log("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}
// 取较小值
@ -476,7 +494,8 @@ public class JobContainer extends AbstractContainer {
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
EtlJobLogger.log("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
@ -509,11 +528,12 @@ public class JobContainer extends AbstractContainer {
this.needChannelNumber, channelsPerTaskGroup);
LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
EtlJobLogger.log("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
ExecuteMode executeMode = null;
AbstractScheduler scheduler;
try {
executeMode = ExecuteMode.STANDALONE;
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
//设置 executeMode
@ -529,6 +549,7 @@ public class JobContainer extends AbstractContainer {
}
LOG.info("Running by {} Mode.", executeMode);
EtlJobLogger.log("Running by {} Mode.", executeMode);
this.startTransferTimeStamp = System.currentTimeMillis();
@ -537,6 +558,7 @@ public class JobContainer extends AbstractContainer {
this.endTransferTimeStamp = System.currentTimeMillis();
} catch (Exception e) {
LOG.error("运行scheduler 模式[{}]出错.", executeMode);
EtlJobLogger.log("运行scheduler 模式[{}]出错.", executeMode);
this.endTransferTimeStamp = System.currentTimeMillis();
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
@ -603,8 +625,7 @@ public class JobContainer extends AbstractContainer {
super.getContainerCommunicator().report(reportCommunication);
LOG.info(String.format(
String log = String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n",
@ -625,12 +646,14 @@ public class JobContainer extends AbstractContainer {
String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
"读写失败总数",
String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
));
);
LOG.info(log);
EtlJobLogger.log(log);
if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
LOG.info(String.format(
log = String.format(
"\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
"Transformer成功记录总数",
communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
@ -640,7 +663,9 @@ public class JobContainer extends AbstractContainer {
"Transformer过滤记录总数",
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
));
);
LOG.info(log);
EtlJobLogger.log(log);
}
@ -712,6 +737,8 @@ public class JobContainer extends AbstractContainer {
PluginType.READER, this.readerPluginName));
LOG.info(String.format("DataX Reader.Job [%s] do prepare work .",
this.readerPluginName));
EtlJobLogger.log(String.format("DataX Reader.Job [%s] do prepare work .",
this.readerPluginName));
this.jobReader.prepare();
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
@ -721,6 +748,8 @@ public class JobContainer extends AbstractContainer {
PluginType.WRITER, this.writerPluginName));
LOG.info(String.format("DataX Writer.Job [%s] do prepare work .",
this.writerPluginName));
EtlJobLogger.log(String.format("DataX Writer.Job [%s] do prepare work .",
this.writerPluginName));
this.jobWriter.prepare();
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
@ -738,6 +767,8 @@ public class JobContainer extends AbstractContainer {
}
LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
EtlJobLogger.log("DataX Reader.Job [{}] splits to [{}] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
}
@ -755,6 +786,8 @@ public class JobContainer extends AbstractContainer {
}
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
EtlJobLogger.log("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return writerSlicesConfigs;
@ -797,7 +830,7 @@ public class JobContainer extends AbstractContainer {
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
writerTasksConfigs.get(i));
if(transformerConfigs!=null && transformerConfigs.size()>0){
if (transformerConfigs != null && transformerConfigs.size() > 0) {
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
}
@ -941,6 +974,8 @@ public class JobContainer extends AbstractContainer {
PluginType.READER, this.readerPluginName));
LOG.info("DataX Reader.Job [{}] do post work.",
this.readerPluginName);
EtlJobLogger.log("DataX Reader.Job [{}] do post work.",
this.readerPluginName);
this.jobReader.post();
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
@ -950,6 +985,8 @@ public class JobContainer extends AbstractContainer {
PluginType.WRITER, this.writerPluginName));
LOG.info("DataX Writer.Job [{}] do post work.",
this.writerPluginName);
EtlJobLogger.log("DataX Writer.Job [{}] do post work.",
this.writerPluginName);
this.jobWriter.post();
classLoaderSwapper.restoreCurrentThreadClassLoader();
}

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.core.job.scheduler;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
@ -90,6 +91,7 @@ public abstract class AbstractScheduler {
if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
LOG.info("Scheduler accomplished all tasks.");
EtlJobLogger.log("Scheduler accomplished all tasks.");
break;
}

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.core.statistics.container.communicator.job;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
@ -48,6 +49,7 @@ public class StandAloneJobContainerCommunicator extends AbstractContainerCommuni
super.getReporter().reportJobCommunication(super.getJobId(), communication);
LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
EtlJobLogger.log(CommunicationTool.Stringify.getSnapshot(communication));
reportVmInfo();
}

View File

@ -2,12 +2,12 @@ package com.alibaba.datax.core.statistics.plugin.task;
import com.alibaba.datax.common.constant.PluginType;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,10 +63,13 @@ public class StdoutPluginCollector extends AbstractTaskPluginCollector {
int logNum = currentLogNum.getAndIncrement();
if(logNum==0 && t!=null){
LOG.error("", t);
EtlJobLogger.log(t);
}
if (maxLogNum.intValue() < 0 || currentLogNum.intValue() < maxLogNum.intValue()) {
LOG.error("脏数据: \n"
+ this.formatDirty(dirtyRecord, t, errorMessage));
EtlJobLogger.log("脏数据: \n"
+ this.formatDirty(dirtyRecord, t, errorMessage));
}
super.collectDirtyRecord(dirtyRecord, t, errorMessage);

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.core.taskgroup;
import com.alibaba.datax.common.constant.PluginType;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.statistics.PerfRecord;
@ -131,7 +132,9 @@ public class TaskGroupContainer extends AbstractContainer {
LOG.info(String.format(
"taskGroupId=[%d] start [%d] channels for [%d] tasks.",
this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
EtlJobLogger.log(String.format(
"taskGroupId=[%d] start [%d] channels for [%d] tasks.",
this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
this.containerCommunicator.registerCommunication(taskConfigs);
Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置
@ -180,6 +183,8 @@ public class TaskGroupContainer extends AbstractContainer {
LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
this.taskGroupId, taskId, usedTime);
//usedTime*1000*1000 转换成PerfRecord记录的ns这里主要是简单登记进行最长任务的打印因此增加特定静态方法
EtlJobLogger.log("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
this.taskGroupId, taskId, usedTime);
PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);
taskStartTimeMap.remove(taskId);
taskConfigMap.remove(taskId);
@ -222,6 +227,8 @@ public class TaskGroupContainer extends AbstractContainer {
}else{
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
this.taskGroupId, taskId, lastExecutor.getAttemptCount());
EtlJobLogger.log("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
this.taskGroupId, taskId, lastExecutor.getAttemptCount());
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
@ -238,6 +245,8 @@ public class TaskGroupContainer extends AbstractContainer {
taskFailedExecutorMap.remove(taskId);
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
this.taskGroupId, taskId, attemptCount);
EtlJobLogger.log("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
this.taskGroupId, taskId, attemptCount);
}
//4.任务列表为空executor已结束, 搜集状态为success--->成功
@ -247,6 +256,7 @@ public class TaskGroupContainer extends AbstractContainer {
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
EtlJobLogger.log("taskGroup[{}] completed it's tasks.", this.taskGroupId);
break;
}
@ -290,9 +300,11 @@ public class TaskGroupContainer extends AbstractContainer {
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
EtlJobLogger.log(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
EtlJobLogger.log(PerfTrace.getInstance().summarizeNoException());
}
}
}

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.core.taskgroup.runner;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
@ -71,6 +72,7 @@ public class ReaderRunner extends AbstractRunner implements Runnable {
} catch (Throwable e) {
LOG.error("Reader runner Received Exceptions:", e);
super.markFail(e);
EtlJobLogger.log(e);
} finally {
LOG.debug("task reader starts to do destroy ...");
PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.core.transport.channel;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
@ -57,6 +58,8 @@ public abstract class Channel {
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
if (capacity <= 0) {
EtlJobLogger.log(String.format(
"通道容量[%d]必须大于0.", capacity));
throw new IllegalArgumentException(String.format(
"通道容量[%d]必须大于0.", capacity));
}

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.core.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.util.container.CoreConstant;
import org.apache.commons.io.FileUtils;
@ -55,6 +56,7 @@ public final class ConfigParser {
}catch (Exception e){
//吞掉异常保持log干净这里message足够
LOG.warn(String.format("插件[%s,%s]加载失败1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
EtlJobLogger.log(String.format("插件[%s,%s]加载失败1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.core.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.transformer.*;
import com.alibaba.datax.core.util.container.CoreConstant;
@ -47,6 +48,7 @@ public class TransformerUtil {
* 延迟load 第三方插件的function并按需load
*/
LOG.info(String.format(" user config tranformers [%s], loading...", functionNames));
EtlJobLogger.log(String.format(" user config tranformers [%s], loading...", functionNames));
TransformerRegistry.loadTransformerFromLocalStorage(functionNames);
int i = 0;
@ -99,6 +101,9 @@ public class TransformerUtil {
LOG.info(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s"
, i, transformerInfo.getTransformer().getTransformerName()
, transformerInfo.isNative(), configuration.getConfiguration("parameter")));
EtlJobLogger.log(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s"
, i, transformerInfo.getTransformer().getTransformerName()
, transformerInfo.isNative(), configuration.getConfiguration("parameter")));
}
return result;

View File

@ -149,6 +149,8 @@ public class CoreConstant {
public static final String CURRENT_SERVICE_PASSWORD = "current.service.password";
public static String LOG_FILE_PATH = "logFilePath";
// ----------------------------- 环境变量 ---------------------------------
/**

View File

@ -1,13 +1,8 @@
package com.alibaba.datax.plugin.rdbms.reader;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.statistics.PerfRecord;
@ -22,7 +17,6 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,6 +50,8 @@ public class CommonRdbmsReader {
LOG.debug("After job init(), job config now is:[\n{}\n]",
originalConfig.toJSON());
EtlJobLogger.log("After job init(), job config now is:[\n{}\n]",
originalConfig.toJSON());
}
public void preCheck(Configuration originalConfig,DataBaseType dataBaseType) {
@ -158,9 +154,11 @@ public class CommonRdbmsReader {
DBUtilErrorCode.JDBC_OB10_ADDRESS_ERROR, "JDBC OB10格式错误请联系askdatax");
}
LOG.info("this is ob1_0 jdbc url.");
EtlJobLogger.log("this is ob1_0 jdbc url.");
this.username = ss[1].trim() +":"+this.username;
this.jdbcUrl = ss[2];
LOG.info("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl);
EtlJobLogger.log("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl);
}
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
@ -179,6 +177,8 @@ public class CommonRdbmsReader {
LOG.info("Begin to read record by Sql: [{}\n] {}.",
querySql, basicMsg);
EtlJobLogger.log("Begin to read record by Sql: [{}\n] {}.",
querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
@ -215,6 +215,8 @@ public class CommonRdbmsReader {
//目前大盘是依赖这个打印而之前这个Finish read record是包含了sql查询和result next的全部时间
LOG.info("Finished read record by Sql: [{}\n] {}.",
querySql, basicMsg);
EtlJobLogger.log("Finished read record by Sql: [{}\n] {}.",
querySql, basicMsg);
}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
@ -340,6 +342,7 @@ public class CommonRdbmsReader {
LOG.debug("read data " + record.toString()
+ " occur exception:", e);
}
EtlJobLogger.log(e);
//TODO 这里识别为脏数据靠谱吗
taskPluginCollector.collectDirtyRecord(record, e);
if (e instanceof DataXException) {

View File

@ -2,10 +2,10 @@ package com.alibaba.datax.plugin.rdbms.reader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -126,6 +126,7 @@ public class ResultSetReadProxy {
LOG.debug("read data " + record.toString()
+ " occur exception:", e);
}
EtlJobLogger.log(e);
//TODO 这里识别为脏数据靠谱吗
taskPluginCollector.collectDirtyRecord(record, e);

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.plugin.rdbms.reader.util;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
@ -52,14 +53,17 @@ public class HintUtil {
//主库不并发读取
if(finalHint.indexOf("parallel") > 0 && DBUtil.isOracleMaster(jdbcUrl, username, password)){
LOG.info("master:{} will not use hint:{}", jdbcUrl, finalHint);
EtlJobLogger.log("master:{} will not use hint:{}", jdbcUrl, finalHint);
}else{
LOG.info("table:{} use hint:{}.", table, finalHint);
EtlJobLogger.log("table:{} use hint:{}.", table, finalHint);
return finalHint + column;
}
}
}
} catch (Exception e){
LOG.warn("match hint exception, will not use hint", e);
EtlJobLogger.log(e);
}
return column;
}

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.rdbms.reader.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.ListUtil;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
@ -100,6 +101,7 @@ public final class OriginalConfPretreatmentUtil {
i, Key.JDBC_URL), jdbcUrl);
LOG.info("Available jdbcUrl:{}.",jdbcUrl);
EtlJobLogger.log("Available jdbcUrl:{}.", jdbcUrl);
if (isTableMode) {
// table 方式
@ -144,6 +146,7 @@ public final class OriginalConfPretreatmentUtil {
if (1 == userConfiguredColumns.size()
&& "*".equals(userConfiguredColumns.get(0))) {
LOG.warn("您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.");
EtlJobLogger.log("您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.");
// 回填其值需要以 String 的方式转交后续处理
originalConfig.set(Key.COLUMN, "*");
} else {
@ -161,6 +164,8 @@ public final class OriginalConfPretreatmentUtil {
tableName);
LOG.info("table:[{}] has columns:[{}].",
tableName, StringUtils.join(allColumns, ","));
EtlJobLogger.log("table:[{}] has columns:[{}].",
tableName, StringUtils.join(allColumns, ","));
// warn:注意mysql表名区分大小写
allColumns = ListUtil.valueToLowerCase(allColumns);
List<String> quotedColumns = new ArrayList<String>();
@ -203,6 +208,7 @@ public final class OriginalConfPretreatmentUtil {
if (null != userConfiguredColumns
&& userConfiguredColumns.size() > 0) {
LOG.warn("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 column. 如果您不想看到这条提醒,请移除您源头表中配置中的 column.");
EtlJobLogger.log("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 column. 如果您不想看到这条提醒,请移除您源头表中配置中的 column.");
originalConfig.remove(Key.COLUMN);
}
@ -210,6 +216,7 @@ public final class OriginalConfPretreatmentUtil {
String where = originalConfig.getString(Key.WHERE, null);
if (StringUtils.isNotBlank(where)) {
LOG.warn("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 where. 如果您不想看到这条提醒,请移除您源头表中配置中的 where.");
EtlJobLogger.log("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 where. 如果您不想看到这条提醒,请移除您源头表中配置中的 where.");
originalConfig.remove(Key.WHERE);
}
@ -217,6 +224,7 @@ public final class OriginalConfPretreatmentUtil {
String splitPk = originalConfig.getString(Key.SPLIT_PK, null);
if (StringUtils.isNotBlank(splitPk)) {
LOG.warn("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 splitPk. 如果您不想看到这条提醒,请移除您源头表中配置中的 splitPk.");
EtlJobLogger.log("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 splitPk. 如果您不想看到这条提醒,请移除您源头表中配置中的 splitPk.");
originalConfig.remove(Key.SPLIT_PK);
}
}

View File

@ -1,12 +1,12 @@
package com.alibaba.datax.plugin.rdbms.reader.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.*;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@ -116,6 +116,8 @@ public class SingleTableSplitUtil {
LOG.info("After split(), allQuerySql=[\n{}\n].",
StringUtils.join(allQuerySql, "\n"));
EtlJobLogger.log("After split(), allQuerySql=[\n{}\n].",
StringUtils.join(allQuerySql, "\n"));
tempConfig.set(Key.QUERY_SQL, tempQuerySql);
pluginParams.add(tempConfig);
@ -171,6 +173,7 @@ public class SingleTableSplitUtil {
private static Pair<Object, Object> checkSplitPk(Connection conn, String pkRangeSQL, int fetchSize, String table,
String username, Configuration configuration) {
LOG.info("split pk [sql={}] is running... ", pkRangeSQL);
EtlJobLogger.log("split pk [sql={}] is running... ", pkRangeSQL);
ResultSet rs = null;
Pair<Object, Object> minMaxPK = null;
try {
@ -318,6 +321,7 @@ public class SingleTableSplitUtil {
Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcURL,
username, password);
LOG.info("split pk [sql={}] is running... ", splitSql);
EtlJobLogger.log("split pk [sql={}] is running... ", splitSql);
ResultSet rs = null;
List<Pair<Object, Integer>> splitedRange = new ArrayList<Pair<Object, Integer>>();
try {
@ -347,6 +351,7 @@ public class SingleTableSplitUtil {
DBUtil.closeDBResources(rs, null, null);
}
LOG.debug(JSON.toJSONString(splitedRange));
EtlJobLogger.log(JSON.toJSONString(splitedRange));
List<String> rangeSql = new ArrayList<String>();
int splitedRangeSize = splitedRange.size();
// warn: splitedRangeSize may be 0 or 1切分规则为IS NULL以及 IS NOT NULL

View File

@ -1,13 +1,13 @@
package com.alibaba.datax.plugin.rdbms.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.druid.sql.parser.SQLParserUtils;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
@ -151,13 +151,16 @@ public final class DBUtil {
}
} else {
LOG.warn("SHOW SLAVE STATUS has no result");
EtlJobLogger.log("SHOW SLAVE STATUS has no result");
}
}
} else {
LOG.warn("SHOW VARIABLES like 'read_only' has no result");
EtlJobLogger.log("SHOW VARIABLES like 'read_only' has no result");
}
} catch (Exception e) {
LOG.warn("checkSlave failed, errorMessage:[{}].", e.getMessage());
EtlJobLogger.log("checkSlave failed, errorMessage:[{}].", e.getMessage());
}
return false;
}
@ -208,6 +211,7 @@ public final class DBUtil {
}
} catch (Exception e) {
LOG.warn("Check the database has the Insert Privilege failed, errorMessage:[{}]", e.getMessage());
EtlJobLogger.log("Check the database has the Insert Privilege failed, errorMessage:[{}]", e.getMessage());
}
if (tableNames.isEmpty())
return true;
@ -231,10 +235,12 @@ public final class DBUtil {
if(e.getMessage() != null && e.getMessage().contains("insufficient privileges")) {
hasInsertPrivilege = false;
LOG.warn("User [" + userName +"] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage());
EtlJobLogger.log("User [" + userName + "] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage());
}
} else {
hasInsertPrivilege = false;
LOG.warn("User [" + userName + "] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage());
EtlJobLogger.log("User [" + userName + "] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage());
}
}
}
@ -242,6 +248,7 @@ public final class DBUtil {
connection.close();
} catch (SQLException e) {
LOG.warn("connection close failed, " + e.getMessage());
EtlJobLogger.log("connection close failed, " + e.getMessage());
}
return hasInsertPrivilege;
}
@ -260,12 +267,14 @@ public final class DBUtil {
} catch (Exception e) {
hasInsertPrivilege = false;
LOG.warn("User [" + userName +"] has no 'delete' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage());
EtlJobLogger.log("User [" + userName + "] has no 'delete' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage());
}
}
try {
connection.close();
} catch (SQLException e) {
LOG.warn("connection close failed, " + e.getMessage());
EtlJobLogger.log("connection close failed, " + e.getMessage());
}
return hasInsertPrivilege;
}
@ -366,9 +375,11 @@ public final class DBUtil {
DBUtilErrorCode.JDBC_OB10_ADDRESS_ERROR, "JDBC OB10格式错误请联系askdatax");
}
LOG.info("this is ob1_0 jdbc url.");
EtlJobLogger.log("this is ob1_0 jdbc url.");
user = ss[1].trim() +":"+user;
url = ss[2];
LOG.info("this is ob1_0 jdbc url. user="+user+" :url="+url);
EtlJobLogger.log("this is ob1_0 jdbc url. user=" + user + " :url=" + url);
}
Properties prop = new Properties();
@ -598,6 +609,8 @@ public final class DBUtil {
} catch (Exception e) {
LOG.warn("test connection of [{}] failed, for {}.", url,
e.getMessage());
EtlJobLogger.log("test connection of [{}] failed, for {}.", url,
e.getMessage());
} finally {
DBUtil.closeDBResources(null, connection);
}
@ -613,6 +626,7 @@ public final class DBUtil {
for (String pre : preSql) {
if (doPreCheck(connection, pre) == false) {
LOG.warn("doPreCheck failed.");
EtlJobLogger.log("doPreCheck failed.");
return false;
}
}
@ -621,6 +635,8 @@ public final class DBUtil {
} catch (Exception e) {
LOG.warn("test connection of [{}] failed, for {}.", url,
e.getMessage());
EtlJobLogger.log("test connection of [{}] failed, for {}.", url,
e.getMessage());
} finally {
DBUtil.closeDBResources(null, connection);
}
@ -675,6 +691,9 @@ public final class DBUtil {
LOG.warn(
"pre check failed. It should return one result:0, pre:[{}].",
pre);
EtlJobLogger.log(
"pre check failed. It should return one result:0, pre:[{}].",
pre);
return false;
}
@ -687,9 +706,14 @@ public final class DBUtil {
LOG.warn(
"pre check failed. It should return one result:0, pre:[{}].",
pre);
EtlJobLogger.log(
"pre check failed. It should return one result:0, pre:[{}].",
pre);
} catch (Exception e) {
LOG.warn("pre check failed. pre:[{}], errorMessage:[{}].", pre,
e.getMessage());
EtlJobLogger.log("pre check failed. pre:[{}], errorMessage:[{}].", pre,
e.getMessage());
} finally {
DBUtil.closeResultSet(rs);
}
@ -740,6 +764,7 @@ public final class DBUtil {
for (String sessionSql : sessions) {
LOG.info("execute sql:[{}]", sessionSql);
EtlJobLogger.log("execute sql:[{}]", sessionSql);
try {
DBUtil.executeSqlWithoutResultSet(stmt, sessionSql);
} catch (SQLException e) {

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.rdbms.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
@ -42,6 +43,8 @@ public class CommonRdbmsWriter {
LOG.debug("After job init(), originalConfig now is:[\n{}\n]",
originalConfig.toJSON());
EtlJobLogger.log("After job init(), originalConfig now is:[\n{}\n]",
originalConfig.toJSON());
}
/*目前只支持MySQL Writer跟Oracle Writer;检查PreSQL跟PostSQL语法以及insertdelete权限*/
@ -117,6 +120,8 @@ public class CommonRdbmsWriter {
jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.",
StringUtils.join(renderedPreSqls, ";"), jdbcUrl);
EtlJobLogger.log("Begin to execute preSqls:[{}]. context info:{}.",
StringUtils.join(renderedPreSqls, ";"), jdbcUrl);
WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, dataBaseType);
DBUtil.closeDBResources(null, null, conn);
@ -125,6 +130,8 @@ public class CommonRdbmsWriter {
LOG.debug("After job prepare(), originalConfig now is:[\n{}\n]",
originalConfig.toJSON());
EtlJobLogger.log("After job prepare(), originalConfig now is:[\n{}\n]",
originalConfig.toJSON());
}
public List<Configuration> split(Configuration originalConfig,
@ -159,6 +166,9 @@ public class CommonRdbmsWriter {
LOG.info(
"Begin to execute postSqls:[{}]. context info:{}.",
StringUtils.join(renderedPostSqls, ";"), jdbcUrl);
EtlJobLogger.log(
"Begin to execute postSqls:[{}]. context info:{}.",
StringUtils.join(renderedPostSqls, ";"), jdbcUrl);
WriterUtil.executeSqls(conn, renderedPostSqls, jdbcUrl, dataBaseType);
DBUtil.closeDBResources(null, null, conn);
}
@ -217,9 +227,11 @@ public class CommonRdbmsWriter {
DBUtilErrorCode.JDBC_OB10_ADDRESS_ERROR, "JDBC OB10格式错误请联系askdatax");
}
LOG.info("this is ob1_0 jdbc url.");
EtlJobLogger.log("this is ob1_0 jdbc url.");
this.username = ss[1].trim() + ":" + this.username;
this.jdbcUrl = ss[2];
LOG.info("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl);
EtlJobLogger.log("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl);
}
this.table = writerSliceConfig.getString(Key.TABLE);
@ -253,6 +265,8 @@ public class CommonRdbmsWriter {
if (tableNumber != 1) {
LOG.info("Begin to execute preSqls:[{}]. context info:{}.",
StringUtils.join(this.preSqls, ";"), BASIC_MESSAGE);
EtlJobLogger.log("Begin to execute preSqls:[{}]. context info:{}.",
StringUtils.join(this.preSqls, ";"), BASIC_MESSAGE);
WriterUtil.executeSqls(connection, this.preSqls, BASIC_MESSAGE, dataBaseType);
}
@ -334,6 +348,8 @@ public class CommonRdbmsWriter {
LOG.info("Begin to execute postSqls:[{}]. context info:{}.",
StringUtils.join(this.postSqls, ";"), BASIC_MESSAGE);
EtlJobLogger.log("Begin to execute postSqls:[{}]. context info:{}.",
StringUtils.join(this.postSqls, ";"), BASIC_MESSAGE);
WriterUtil.executeSqls(connection, this.postSqls, BASIC_MESSAGE, dataBaseType);
DBUtil.closeDBResources(null, null, connection);
}
@ -358,6 +374,7 @@ public class CommonRdbmsWriter {
connection.commit();
} catch (SQLException e) {
LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage());
EtlJobLogger.log("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage());
connection.rollback();
doOneInsert(connection, buffer);
} catch (Exception e) {
@ -382,7 +399,7 @@ public class CommonRdbmsWriter {
preparedStatement.execute();
} catch (SQLException e) {
LOG.debug(e.toString());
EtlJobLogger.log(e.toString());
this.taskPluginCollector.collectDirtyRecord(record, e);
} finally {
// 最后不要忘了关闭 preparedStatement

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.rdbms.writer.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.ListUtil;
import com.alibaba.datax.plugin.rdbms.util.*;
@ -107,9 +108,12 @@ public final class OriginalConfPretreatmentUtil {
LOG.info("table:[{}] all columns:[\n{}\n].", oneTable,
StringUtils.join(allColumns, ","));
EtlJobLogger.log("table:[{}] all columns:[\n{}\n].", oneTable,
StringUtils.join(allColumns, ","));
if (1 == userConfiguredColumns.size() && "*".equals(userConfiguredColumns.get(0))) {
LOG.warn("您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.");
EtlJobLogger.log("您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.");
// 回填其值需要以 String 的方式转交后续处理
originalConfig.set(Key.COLUMN, allColumns);
@ -163,6 +167,7 @@ public final class OriginalConfPretreatmentUtil {
String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate);
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
EtlJobLogger.log("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.rdbms.writer.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
@ -15,7 +16,9 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Statement;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public final class WriterUtil {
private static final Logger LOG = LoggerFactory.getLogger(WriterUtil.class);
@ -180,6 +183,8 @@ public final class WriterUtil {
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
LOG.info("Begin to preCheck preSqls:[{}].",
StringUtils.join(renderedPreSqls, ";"));
EtlJobLogger.log("Begin to preCheck preSqls:[{}].",
StringUtils.join(renderedPreSqls, ";"));
for(String sql : renderedPreSqls) {
try{
DBUtil.sqlValid(sql, type);
@ -203,6 +208,8 @@ public final class WriterUtil {
LOG.info("Begin to preCheck postSqls:[{}].",
StringUtils.join(renderedPostSqls, ";"));
EtlJobLogger.log("Begin to preCheck postSqls:[{}].",
StringUtils.join(renderedPostSqls, ";"));
for(String sql : renderedPostSqls) {
try{
DBUtil.sqlValid(sql, type);

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.unstructuredstorage.reader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
@ -9,10 +10,11 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.csvreader.CsvReader;
import org.apache.commons.beanutils.BeanUtils;
import io.airlift.compress.snappy.SnappyCodec;
import io.airlift.compress.snappy.SnappyFramedInputStream;
import org.anarres.lzo.*;
import org.anarres.lzo.LzoDecompressor1x_safe;
import org.anarres.lzo.LzoInputStream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
@ -99,6 +101,8 @@ public class UnstructuredStorageReaderUtil {
encoding = Constant.DEFAULT_ENCODING;
LOG.warn(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding,
Constant.DEFAULT_ENCODING));
EtlJobLogger.log(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding,
Constant.DEFAULT_ENCODING));
}
List<Configuration> column = readerSliceConfig
@ -252,6 +256,8 @@ public class UnstructuredStorageReaderUtil {
if (null == delimiterInStr) {
LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
}
// warn: default value ',', fieldDelimiter could be \n(lineDelimiter)
@ -277,6 +283,8 @@ public class UnstructuredStorageReaderUtil {
String fetchLine = reader.readLine();
LOG.info(String.format("Header line %s has been skiped.",
fetchLine));
EtlJobLogger.log(String.format("Header line %s has been skiped.",
fetchLine));
}
csvReader = new CsvReader(reader);
csvReader.setDelimiter(fieldDelimiter);
@ -329,6 +337,8 @@ public class UnstructuredStorageReaderUtil {
if (null == delimiterInStr) {
LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
}
// warn: default value ',', fieldDelimiter could be \n(lineDelimiter)
// for no fieldDelimiter
@ -388,6 +398,7 @@ public class UnstructuredStorageReaderUtil {
sourceLine.length, columnIndex + 1,
StringUtils.join(sourceLine, ","));
LOG.warn(message);
EtlJobLogger.log(message);
throw new IndexOutOfBoundsException(message);
}
@ -463,6 +474,7 @@ public class UnstructuredStorageReaderUtil {
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
EtlJobLogger.log(errorMessage);
throw DataXException
.asDataXException(
UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE,
@ -632,6 +644,7 @@ public class UnstructuredStorageReaderUtil {
UnstructuredStorageReaderUtil.csvReaderConfigMap = JSON.parseObject(csvReaderConfig, new TypeReference<HashMap<String, Object>>() {});
}catch (Exception e) {
LOG.info(String.format("WARN!!!!忽略csvReaderConfig配置! 配置错误,值只能为空或者为Map结构,您配置的值为: %s", csvReaderConfig));
EtlJobLogger.log(String.format("WARN!!!!忽略csvReaderConfig配置! 配置错误,值只能为空或者为Map结构,您配置的值为: %s", csvReaderConfig));
}
}
}
@ -685,14 +698,18 @@ public class UnstructuredStorageReaderUtil {
try {
BeanUtils.populate(csvReader,UnstructuredStorageReaderUtil.csvReaderConfigMap);
LOG.info(String.format("csvReaderConfig设置成功,设置后CsvReader:%s", JSON.toJSONString(csvReader)));
EtlJobLogger.log(String.format("csvReaderConfig设置成功,设置后CsvReader:%s", JSON.toJSONString(csvReader)));
} catch (Exception e) {
LOG.info(String.format("WARN!!!!忽略csvReaderConfig配置!通过BeanUtils.populate配置您的csvReaderConfig发生异常,您配置的值为: %s;请检查您的配置!CsvReader使用默认值[%s]",
JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap),JSON.toJSONString(csvReader)));
EtlJobLogger.log(String.format("WARN!!!!忽略csvReaderConfig配置!通过BeanUtils.populate配置您的csvReaderConfig发生异常,您配置的值为: %s;请检查您的配置!CsvReader使用默认值[%s]",
JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap), JSON.toJSONString(csvReader)));
}
}else {
//默认关闭安全模式, 放开10W字节的限制
csvReader.setSafetySwitch(false);
LOG.info(String.format("CsvReader使用默认值[%s],csvReaderConfig值为[%s]",JSON.toJSONString(csvReader),JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap)));
EtlJobLogger.log(String.format("CsvReader使用默认值[%s],csvReaderConfig值为[%s]", JSON.toJSONString(csvReader), JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap)));
}
}
}

View File

@ -1,15 +1,15 @@
package com.alibaba.datax.plugin.unstructuredstorage.writer;
import java.io.IOException;
import java.io.Writer;
import java.util.List;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.csvreader.CsvWriter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.csvreader.CsvWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.List;
public class TextCsvWriterManager {
public static UnstructuredWriter produceUnstructuredWriter(
@ -43,6 +43,7 @@ class CsvWriterImpl implements UnstructuredWriter {
public void writeOneRecord(List<String> splitedRows) throws IOException {
if (splitedRows.isEmpty()) {
LOG.info("Found one record line which is empty.");
EtlJobLogger.log("Found one record line which is empty.");
}
this.csvWriter.writeRecord((String[]) splitedRows
.toArray(new String[0]));
@ -76,6 +77,7 @@ class TextWriterImpl implements UnstructuredWriter {
public void writeOneRecord(List<String> splitedRows) throws IOException {
if (splitedRows.isEmpty()) {
LOG.info("Found one record line which is empty.");
EtlJobLogger.log("Found one record line which is empty.");
}
this.textWriter.write(String.format("%s%s",
StringUtils.join(splitedRows, this.fieldDelimiter),

View File

@ -1,18 +1,14 @@
package com.alibaba.datax.plugin.unstructuredstorage.writer;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.log.EtlJobLogger;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.google.common.collect.Sets;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
@ -22,14 +18,10 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.google.common.collect.Sets;
import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
public class UnstructuredStorageWriterUtil {
private UnstructuredStorageWriterUtil() {
@ -66,6 +58,8 @@ public class UnstructuredStorageWriterUtil {
// like " ", null
LOG.warn(String.format("您的encoding配置为空, 将使用默认值[%s]",
Constant.DEFAULT_ENCODING));
EtlJobLogger.log(String.format("您的encoding配置为空, 将使用默认值[%s]",
Constant.DEFAULT_ENCODING));
writerConfiguration.set(Key.ENCODING, Constant.DEFAULT_ENCODING);
} else {
try {
@ -107,6 +101,8 @@ public class UnstructuredStorageWriterUtil {
if (null == delimiterInStr) {
LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
writerConfiguration.set(Key.FIELD_DELIMITER,
Constant.DEFAULT_FIELD_DELIMITER);
}
@ -126,6 +122,7 @@ public class UnstructuredStorageWriterUtil {
public static List<Configuration> split(Configuration writerSliceConfig,
Set<String> originAllFileExists, int mandatoryNumber) {
LOG.info("begin do split...");
EtlJobLogger.log("begin do split...");
Set<String> allFileExists = new HashSet<String>();
allFileExists.addAll(originAllFileExists);
List<Configuration> writerSplitConfigs = new ArrayList<Configuration>();
@ -146,9 +143,12 @@ public class UnstructuredStorageWriterUtil {
splitedTaskConfig.set(Key.FILE_NAME, fullFileName);
LOG.info(String
.format("splited write file name:[%s]", fullFileName));
EtlJobLogger.log(String
.format("splited write file name:[%s]", fullFileName));
writerSplitConfigs.add(splitedTaskConfig);
}
LOG.info("end do split.");
EtlJobLogger.log("end do split.");
return writerSplitConfigs;
}
@ -187,6 +187,8 @@ public class UnstructuredStorageWriterUtil {
if (StringUtils.isBlank(encoding)) {
LOG.warn(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding,
Constant.DEFAULT_ENCODING));
EtlJobLogger.log(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding,
Constant.DEFAULT_ENCODING));
encoding = Constant.DEFAULT_ENCODING;
}
String compress = config.getString(Key.COMPRESS);
@ -264,6 +266,8 @@ public class UnstructuredStorageWriterUtil {
if (null == delimiterInStr) {
LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]",
Constant.DEFAULT_FIELD_DELIMITER));
}
// warn: fieldDelimiter could not be '' for no fieldDelimiter