From e74ae53dec86f53433cc9d22fc792f8ebc535564 Mon Sep 17 00:00:00 2001 From: zhouhongfa Date: Thu, 27 Jun 2019 20:50:23 +0800 Subject: [PATCH] =?UTF-8?q?update:=20=E5=9C=A8=E5=8E=9F=E6=9D=A5=E7=9A=84d?= =?UTF-8?q?atax=20Logger=20=E4=B8=8B=E5=A2=9E=E5=8A=A0=E4=BF=9D=E5=AD=98?= =?UTF-8?q?=E5=88=B0=E6=97=A5=E5=BF=97=E6=96=87=E4=BB=B6=E4=B8=AD=EF=BC=8C?= =?UTF-8?q?=E5=90=8E=E9=9D=A2=E4=BC=9A=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E5=8F=AF=E8=A7=86=E5=8C=96=E9=A1=B5=E9=9D=A2=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/common/statistics/PerfRecord.java | 2 + .../datax/common/statistics/PerfTrace.java | 3 + .../datax/common/statistics/VMInfo.java | 7 ++ .../alibaba/datax/common/util/HostUtils.java | 3 + .../alibaba/datax/common/util/RetryUtil.java | 7 ++ core/pom.xml | 5 -- .../java/com/alibaba/datax/core/Engine.java | 36 +++++----- .../core/container/util/HookInvoker.java | 4 ++ .../alibaba/datax/core/job/JobContainer.java | 69 ++++++++++++++----- .../core/job/scheduler/AbstractScheduler.java | 2 + .../StandAloneJobContainerCommunicator.java | 2 + .../plugin/task/StdoutPluginCollector.java | 7 +- .../core/taskgroup/TaskGroupContainer.java | 14 +++- .../core/taskgroup/runner/ReaderRunner.java | 2 + .../datax/core/transport/channel/Channel.java | 3 + .../alibaba/datax/core/util/ConfigParser.java | 2 + .../datax/core/util/TransformerUtil.java | 5 ++ .../core/util/container/CoreConstant.java | 2 + .../rdbms/reader/CommonRdbmsReader.java | 19 ++--- .../rdbms/reader/ResultSetReadProxy.java | 3 +- .../plugin/rdbms/reader/util/HintUtil.java | 4 ++ .../util/OriginalConfPretreatmentUtil.java | 8 +++ .../reader/util/SingleTableSplitUtil.java | 7 +- .../datax/plugin/rdbms/util/DBUtil.java | 27 +++++++- .../rdbms/writer/CommonRdbmsWriter.java | 19 ++++- .../util/OriginalConfPretreatmentUtil.java | 5 ++ .../plugin/rdbms/writer/util/WriterUtil.java | 9 ++- .../reader/UnstructuredStorageReaderUtil.java | 21 +++++- .../writer/TextCsvWriterManager.java | 12 ++-- .../writer/UnstructuredStorageWriterUtil.java | 46 +++++++------ 30 files changed, 274 insertions(+), 81 deletions(-) diff --git a/common/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java b/common/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java index 74b26eeb..64ee8b4f 100644 --- a/common/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java +++ b/common/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java @@ -1,5 +1,6 @@ package com.alibaba.datax.common.statistics; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.HostUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.slf4j.Logger; @@ -97,6 +98,7 @@ public class PerfRecord implements Comparable { //在PerfTrace里注册 PerfTrace.getInstance().tracePerfRecord(perfRecord); perf.info(perfRecord.toString()); + EtlJobLogger.log(perfRecord.toString()); } } diff --git a/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java b/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java index ea9aa421..e827a232 100644 --- a/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java +++ b/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java @@ -1,5 +1,6 @@ package com.alibaba.datax.common.statistics; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.statistics.PerfRecord.PHASE; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.HostUtils; @@ -74,6 +75,7 @@ public class PerfTrace { public static PerfTrace getInstance() { if (instance == null) { LOG.error("PerfTrace instance not be init! must have some error! "); + EtlJobLogger.log("PerfTrace instance not be init! must have some error! "); synchronized (lock) { if (instance == null) { instance = new PerfTrace(false, -1111, -1111, 0, false); @@ -92,6 +94,7 @@ public class PerfTrace { this.instId = jobId; this.priority = priority; LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority)); + EtlJobLogger.log(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority)); } catch (Exception e) { // do nothing diff --git a/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java b/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java index cab42a4b..f184041b 100644 --- a/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java +++ b/common/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java @@ -1,5 +1,6 @@ package com.alibaba.datax.common.statistics; +import com.alibaba.datax.common.log.EtlJobLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,8 @@ public class VMInfo { vmInfo = new VMInfo(); } catch (Exception e) { LOG.warn("no need care, the fail is ignored : vmInfo init failed " + e.getMessage(), e); + EtlJobLogger.log(e); + } } } @@ -84,6 +87,7 @@ public class VMInfo { //构建startPhyOSStatus startPhyOSStatus = new PhyOSStatus(); LOG.info("VMInfo# operatingSystem class => " + osMXBean.getClass().getName()); + EtlJobLogger.log("VMInfo# operatingSystem class => " + osMXBean.getClass().getName()); if (VMInfo.isSunOsMBean(osMXBean)) { { startPhyOSStatus.totalPhysicalMemory = VMInfo.getLongFromOperatingSystem(osMXBean, "getTotalPhysicalMemorySize"); @@ -181,10 +185,12 @@ public class VMInfo { if (print) { LOG.info(processCpuStatus.getDeltaString() + processMomoryStatus.getDeltaString() + processGCStatus.getDeltaString()); + EtlJobLogger.log(processCpuStatus.getDeltaString() + processMomoryStatus.getDeltaString() + processGCStatus.getDeltaString()); } } catch (Exception e) { LOG.warn("no need care, the fail is ignored : vmInfo getDelta failed " + e.getMessage(), e); + EtlJobLogger.log(e); } } @@ -201,6 +207,7 @@ public class VMInfo { return (Long) method.invoke(operatingSystem, (Object[]) null); } catch (final Exception e) { LOG.info(String.format("OperatingSystemMXBean %s failed, Exception = %s ", methodName, e.getMessage())); + EtlJobLogger.log(String.format("OperatingSystemMXBean %s failed, Exception = %s ", methodName, e.getMessage())); } return -1; diff --git a/common/src/main/java/com/alibaba/datax/common/util/HostUtils.java b/common/src/main/java/com/alibaba/datax/common/util/HostUtils.java index 2ed8f101..b7c82fbc 100644 --- a/common/src/main/java/com/alibaba/datax/common/util/HostUtils.java +++ b/common/src/main/java/com/alibaba/datax/common/util/HostUtils.java @@ -1,5 +1,6 @@ package com.alibaba.datax.common.util; +import com.alibaba.datax.common.log.EtlJobLogger; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,10 +41,12 @@ public class HostUtils { } } catch (Exception e) { log.warn("get hostname failed {}", e.getMessage()); + EtlJobLogger.log(e); } } IP = ip; HOSTNAME = hostname; log.info("IP {} HOSTNAME {}", IP, HOSTNAME); + EtlJobLogger.log("IP {} HOSTNAME {}", IP, HOSTNAME); } } diff --git a/common/src/main/java/com/alibaba/datax/common/util/RetryUtil.java b/common/src/main/java/com/alibaba/datax/common/util/RetryUtil.java index 33c71287..054cdbd8 100755 --- a/common/src/main/java/com/alibaba/datax/common/util/RetryUtil.java +++ b/common/src/main/java/com/alibaba/datax/common/util/RetryUtil.java @@ -1,5 +1,6 @@ package com.alibaba.datax.common.util; +import com.alibaba.datax.common.log.EtlJobLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +114,8 @@ public final class RetryUtil { saveException = e; if (i == 0) { LOG.error(String.format("Exception when calling callable, 异常Msg:%s", saveException.getMessage()), saveException); + EtlJobLogger.log(String.format("Exception when calling callable, 异常Msg:%s", saveException.getMessage())); + EtlJobLogger.log(saveException); } if (null != retryExceptionClasss && !retryExceptionClasss.isEmpty()) { @@ -153,6 +156,8 @@ public final class RetryUtil { LOG.error(String.format("Exception when calling callable, 即将尝试执行第%s次重试.本次重试计划等待[%s]ms,实际等待[%s]ms, 异常Msg:[%s]", i+1, timeToSleep,realTimeSleep, e.getMessage())); + EtlJobLogger.log(String.format("Exception when calling callable, 即将尝试执行第%s次重试.本次重试计划等待[%s]ms,实际等待[%s]ms, 异常Msg:[%s]", + i + 1, timeToSleep, realTimeSleep, e.getMessage())); } } @@ -195,11 +200,13 @@ public final class RetryUtil { return future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (Exception e) { LOG.warn("Try once failed", e); + EtlJobLogger.log(e); throw e; } finally { if (!future.isDone()) { future.cancel(true); LOG.warn("Try once task not done, cancel it, active count: " + executor.getActiveCount()); + EtlJobLogger.log("Try once task not done, cancel it, active count: " + executor.getActiveCount()); } } } diff --git a/core/pom.xml b/core/pom.xml index 8b1bbfd7..9daae780 100755 --- a/core/pom.xml +++ b/core/pom.xml @@ -23,11 +23,6 @@ - - cn.hutool - hutool-all - 4.5.1 - commons-configuration commons-configuration diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index 509f7db4..4717d646 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -2,6 +2,8 @@ package com.alibaba.datax.core; import com.alibaba.datax.common.element.ColumnCast; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobFileAppender; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.spi.ErrorCode; import com.alibaba.datax.common.statistics.PerfTrace; import com.alibaba.datax.common.statistics.VMInfo; @@ -48,7 +50,7 @@ public class Engine { boolean isJob = !("taskGroup".equalsIgnoreCase(allConf .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))); //JobContainer会在schedule后再行进行设置和调整值 - int channelNumber =0; + int channelNumber = 0; AbstractContainer container; long instanceId; int taskGroupId = -1; @@ -73,21 +75,21 @@ public class Engine { boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true); //standlone模式的datax shell任务不进行汇报 - if(instanceId == -1){ + if (instanceId == -1) { perfReportEnable = false; } int priority = 0; try { priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY")); - }catch (NumberFormatException e){ - LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY")); + } catch (NumberFormatException e) { + LOG.warn("prioriy set to 0, because NumberFormatException, the value is: " + System.getProperty("PROIORY")); } Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); //初始化PerfTrace PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable); - perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber); + perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber); container.start(); } @@ -101,12 +103,12 @@ public class Engine { filterSensitiveConfiguration(jobContent); - jobConfWithSetting.set("content",jobContent); + jobConfWithSetting.set("content", jobContent); return jobConfWithSetting.beautify(); } - public static Configuration filterSensitiveConfiguration(Configuration configuration){ + public static Configuration filterSensitiveConfiguration(Configuration configuration) { Set keys = configuration.getKeys(); for (final String key : keys) { boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password") @@ -134,6 +136,11 @@ public class Engine { RUNTIME_MODE = "standalone"; Configuration configuration = ConfigParser.parse(jobPath); + //取出日志文件路径 + String logFilePath = configuration.getString(CoreConstant.LOG_FILE_PATH); + LOG.info("设置日志文件路径:{}", logFilePath); + //设置日志文件路径 + EtlJobFileAppender.contextHolder.set(logFilePath); long jobId; if (!"-1".equalsIgnoreCase(jobIdString)) { @@ -159,12 +166,13 @@ public class Engine { VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { LOG.info(vmInfo.toString()); + EtlJobLogger.log(vmInfo.toString()); } LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n"); - + EtlJobLogger.log("\n" + Engine.filterJobConfiguration(configuration) + "\n"); LOG.debug(configuration.toJSON()); - + EtlJobLogger.log(configuration.toJSON()); ConfigurationValidate.doValidate(configuration); Engine engine = new Engine(); engine.start(configuration); @@ -173,8 +181,8 @@ public class Engine { /** * -1 表示未能解析到 jobId - * - * only for dsc & ds & datax 3 update + *

+ * only for dsc & ds & datax 3 update */ private static long parseJobIdFromUrl(List patternStringList, String url) { long result = -1; @@ -207,7 +215,7 @@ public class Engine { } catch (Throwable e) { exitCode = 1; LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e)); - + EtlJobLogger.log("\n\n经DataX智能分析,该任务最可能的错误原因是: {}\n", ExceptionTracker.trace(e)); if (e instanceof DataXException) { DataXException tempException = (DataXException) e; ErrorCode errorCode = tempException.getErrorCode(); @@ -223,8 +231,4 @@ public class Engine { } - - - - } diff --git a/core/src/main/java/com/alibaba/datax/core/container/util/HookInvoker.java b/core/src/main/java/com/alibaba/datax/core/container/util/HookInvoker.java index 6e0ef178..9f74f072 100755 --- a/core/src/main/java/com/alibaba/datax/core/container/util/HookInvoker.java +++ b/core/src/main/java/com/alibaba/datax/core/container/util/HookInvoker.java @@ -6,6 +6,7 @@ package com.alibaba.datax.core.container.util; import com.alibaba.datax.common.exception.CommonErrorCode; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.spi.Hook; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.util.FrameworkErrorCode; @@ -42,6 +43,7 @@ public class HookInvoker { public void invokeAll() { if (!baseDir.exists() || baseDir.isFile()) { LOG.info("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath()); + EtlJobLogger.log("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath()); return; } @@ -73,10 +75,12 @@ public class HookInvoker { } else { Hook hook = hookIt.next(); LOG.info("Invoke hook [{}], path: {}", hook.getName(), path); + EtlJobLogger.log("Invoke hook [{}], path: {}", hook.getName(), path); hook.invoke(conf, msg); } } catch (Exception e) { LOG.error("Exception when invoke hook", e); + EtlJobLogger.log(e); throw DataXException.asDataXException( CommonErrorCode.HOOK_INTERNAL_ERROR, "Exception when invoke hook", e); } finally { diff --git a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java index acc552cc..fa402cc2 100755 --- a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java @@ -2,6 +2,7 @@ package com.alibaba.datax.core.job; import com.alibaba.datax.common.constant.PluginType; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.AbstractJobPlugin; import com.alibaba.datax.common.plugin.JobPluginCollector; import com.alibaba.datax.common.spi.Reader; @@ -95,14 +96,15 @@ public class JobContainer extends AbstractContainer { @Override public void start() { LOG.info("DataX jobContainer starts job."); - + EtlJobLogger.log("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); - if(isDryRun) { + if (isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); + EtlJobLogger.log("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); @@ -112,10 +114,13 @@ public class JobContainer extends AbstractContainer { LOG.debug("jobContainer starts to do init ..."); this.init(); LOG.info("jobContainer starts to do prepare ..."); + EtlJobLogger.log("jobContainer starts to do prepare ..."); this.prepare(); LOG.info("jobContainer starts to do split ..."); + EtlJobLogger.log("jobContainer starts to do split ..."); this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); + EtlJobLogger.log("jobContainer starts to do schedule ..."); this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post(); @@ -128,7 +133,7 @@ public class JobContainer extends AbstractContainer { } } catch (Throwable e) { LOG.error("Exception when job run", e); - + EtlJobLogger.log(e); hasException = true; if (e instanceof OutOfMemoryError) { @@ -162,7 +167,7 @@ public class JobContainer extends AbstractContainer { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } finally { - if(!isDryRun) { + if (!isDryRun) { this.destroy(); this.endTimeStamp = System.currentTimeMillis(); @@ -172,9 +177,11 @@ public class JobContainer extends AbstractContainer { if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); + EtlJobLogger.log(vmInfo.totalString()); } LOG.info(PerfTrace.getInstance().summarizeNoException()); + EtlJobLogger.log(PerfTrace.getInstance().summarizeNoException()); this.logStatistics(); } } @@ -191,6 +198,7 @@ public class JobContainer extends AbstractContainer { this.preCheckReader(); this.preCheckWriter(); LOG.info("PreCheck通过"); + EtlJobLogger.log("PreCheck通过"); } private void preCheckInit() { @@ -199,6 +207,7 @@ public class JobContainer extends AbstractContainer { if (this.jobId < 0) { LOG.info("Set jobId = 0"); + EtlJobLogger.log("Set jobId = 0"); this.jobId = 0; this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, this.jobId); @@ -268,6 +277,8 @@ public class JobContainer extends AbstractContainer { PluginType.READER, this.readerPluginName)); LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .", this.readerPluginName)); + EtlJobLogger.log(String.format("DataX Reader.Job [%s] do preCheck work .", + this.readerPluginName)); this.jobReader.preCheck(); classLoaderSwapper.restoreCurrentThreadClassLoader(); } @@ -277,6 +288,8 @@ public class JobContainer extends AbstractContainer { PluginType.WRITER, this.writerPluginName)); LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .", this.writerPluginName)); + EtlJobLogger.log(String.format("DataX Writer.Job [%s] do preCheck work .", + this.writerPluginName)); this.jobWriter.preCheck(); classLoaderSwapper.restoreCurrentThreadClassLoader(); } @@ -290,6 +303,7 @@ public class JobContainer extends AbstractContainer { if (this.jobId < 0) { LOG.info("Set jobId = 0"); + EtlJobLogger.log("Set jobId = 0"); this.jobId = 0; this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, this.jobId); @@ -312,7 +326,7 @@ public class JobContainer extends AbstractContainer { private void preHandle() { String handlerPluginTypeStr = this.configuration.getString( CoreConstant.DATAX_JOB_PREHANDLER_PLUGINTYPE); - if(!StringUtils.isNotEmpty(handlerPluginTypeStr)){ + if (!StringUtils.isNotEmpty(handlerPluginTypeStr)) { return; } PluginType handlerPluginType; @@ -342,13 +356,14 @@ public class JobContainer extends AbstractContainer { classLoaderSwapper.restoreCurrentThreadClassLoader(); LOG.info("After PreHandler: \n" + Engine.filterJobConfiguration(configuration) + "\n"); + EtlJobLogger.log("After PreHandler: \n" + Engine.filterJobConfiguration(configuration) + "\n"); } private void postHandle() { String handlerPluginTypeStr = this.configuration.getString( CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINTYPE); - if(!StringUtils.isNotEmpty(handlerPluginTypeStr)){ + if (!StringUtils.isNotEmpty(handlerPluginTypeStr)) { return; } PluginType handlerPluginType; @@ -398,7 +413,7 @@ public class JobContainer extends AbstractContainer { List transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); - LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList)); + LOG.debug("transformer configuration: " + JSON.toJSONString(transformerList)); /** * 输入是reader和writer的parameter list,输出是content下面元素的list */ @@ -406,7 +421,8 @@ public class JobContainer extends AbstractContainer { readerTaskConfigs, writerTaskConfigs, transformerList); - LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig)); + LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentConfig)); + EtlJobLogger.log("contentConfig configuration: " + JSON.toJSONString(contentConfig)); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig); @@ -437,6 +453,7 @@ public class JobContainer extends AbstractContainer { needChannelNumberByByte = needChannelNumberByByte > 0 ? needChannelNumberByByte : 1; LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes."); + EtlJobLogger.log("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes."); } boolean isRecordLimit = (this.configuration.getInt( @@ -457,6 +474,7 @@ public class JobContainer extends AbstractContainer { needChannelNumberByRecord = needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1; LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records."); + EtlJobLogger.log("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records."); } // 取较小值 @@ -476,7 +494,8 @@ public class JobContainer extends AbstractContainer { LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels."); - + EtlJobLogger.log("Job set Channel-Number to " + this.needChannelNumber + + " channels."); return; } @@ -509,11 +528,12 @@ public class JobContainer extends AbstractContainer { this.needChannelNumber, channelsPerTaskGroup); LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size()); + EtlJobLogger.log("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size()); ExecuteMode executeMode = null; AbstractScheduler scheduler; try { - executeMode = ExecuteMode.STANDALONE; + executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); //设置 executeMode @@ -529,6 +549,7 @@ public class JobContainer extends AbstractContainer { } LOG.info("Running by {} Mode.", executeMode); + EtlJobLogger.log("Running by {} Mode.", executeMode); this.startTransferTimeStamp = System.currentTimeMillis(); @@ -537,6 +558,7 @@ public class JobContainer extends AbstractContainer { this.endTransferTimeStamp = System.currentTimeMillis(); } catch (Exception e) { LOG.error("运行scheduler 模式[{}]出错.", executeMode); + EtlJobLogger.log("运行scheduler 模式[{}]出错.", executeMode); this.endTransferTimeStamp = System.currentTimeMillis(); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); @@ -603,8 +625,7 @@ public class JobContainer extends AbstractContainer { super.getContainerCommunicator().report(reportCommunication); - - LOG.info(String.format( + String log = String.format( "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n", @@ -625,12 +646,14 @@ public class JobContainer extends AbstractContainer { String.valueOf(CommunicationTool.getTotalReadRecords(communication)), "读写失败总数", String.valueOf(CommunicationTool.getTotalErrorRecords(communication)) - )); + ); + LOG.info(log); + EtlJobLogger.log(log); if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) { - LOG.info(String.format( + log = String.format( "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n", "Transformer成功记录总数", communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS), @@ -640,7 +663,9 @@ public class JobContainer extends AbstractContainer { "Transformer过滤记录总数", communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) - )); + ); + LOG.info(log); + EtlJobLogger.log(log); } @@ -712,6 +737,8 @@ public class JobContainer extends AbstractContainer { PluginType.READER, this.readerPluginName)); LOG.info(String.format("DataX Reader.Job [%s] do prepare work .", this.readerPluginName)); + EtlJobLogger.log(String.format("DataX Reader.Job [%s] do prepare work .", + this.readerPluginName)); this.jobReader.prepare(); classLoaderSwapper.restoreCurrentThreadClassLoader(); } @@ -721,6 +748,8 @@ public class JobContainer extends AbstractContainer { PluginType.WRITER, this.writerPluginName)); LOG.info(String.format("DataX Writer.Job [%s] do prepare work .", this.writerPluginName)); + EtlJobLogger.log(String.format("DataX Writer.Job [%s] do prepare work .", + this.writerPluginName)); this.jobWriter.prepare(); classLoaderSwapper.restoreCurrentThreadClassLoader(); } @@ -738,6 +767,8 @@ public class JobContainer extends AbstractContainer { } LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.", this.readerPluginName, readerSlicesConfigs.size()); + EtlJobLogger.log("DataX Reader.Job [{}] splits to [{}] tasks.", + this.readerPluginName, readerSlicesConfigs.size()); classLoaderSwapper.restoreCurrentThreadClassLoader(); return readerSlicesConfigs; } @@ -755,6 +786,8 @@ public class JobContainer extends AbstractContainer { } LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.", this.writerPluginName, writerSlicesConfigs.size()); + EtlJobLogger.log("DataX Writer.Job [{}] splits to [{}] tasks.", + this.writerPluginName, writerSlicesConfigs.size()); classLoaderSwapper.restoreCurrentThreadClassLoader(); return writerSlicesConfigs; @@ -797,7 +830,7 @@ public class JobContainer extends AbstractContainer { taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER, writerTasksConfigs.get(i)); - if(transformerConfigs!=null && transformerConfigs.size()>0){ + if (transformerConfigs != null && transformerConfigs.size() > 0) { taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs); } @@ -941,6 +974,8 @@ public class JobContainer extends AbstractContainer { PluginType.READER, this.readerPluginName)); LOG.info("DataX Reader.Job [{}] do post work.", this.readerPluginName); + EtlJobLogger.log("DataX Reader.Job [{}] do post work.", + this.readerPluginName); this.jobReader.post(); classLoaderSwapper.restoreCurrentThreadClassLoader(); } @@ -950,6 +985,8 @@ public class JobContainer extends AbstractContainer { PluginType.WRITER, this.writerPluginName)); LOG.info("DataX Writer.Job [{}] do post work.", this.writerPluginName); + EtlJobLogger.log("DataX Writer.Job [{}] do post work.", + this.writerPluginName); this.jobWriter.post(); classLoaderSwapper.restoreCurrentThreadClassLoader(); } diff --git a/core/src/main/java/com/alibaba/datax/core/job/scheduler/AbstractScheduler.java b/core/src/main/java/com/alibaba/datax/core/job/scheduler/AbstractScheduler.java index ab2b5aa3..87ae1548 100755 --- a/core/src/main/java/com/alibaba/datax/core/job/scheduler/AbstractScheduler.java +++ b/core/src/main/java/com/alibaba/datax/core/job/scheduler/AbstractScheduler.java @@ -1,6 +1,7 @@ package com.alibaba.datax.core.job.scheduler; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.statistics.communication.Communication; import com.alibaba.datax.core.statistics.communication.CommunicationTool; @@ -90,6 +91,7 @@ public abstract class AbstractScheduler { if (nowJobContainerCommunication.getState() == State.SUCCEEDED) { LOG.info("Scheduler accomplished all tasks."); + EtlJobLogger.log("Scheduler accomplished all tasks."); break; } diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/job/StandAloneJobContainerCommunicator.java b/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/job/StandAloneJobContainerCommunicator.java index 7ace8118..2852334a 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/job/StandAloneJobContainerCommunicator.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/job/StandAloneJobContainerCommunicator.java @@ -1,5 +1,6 @@ package com.alibaba.datax.core.statistics.container.communicator.job; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.statistics.communication.Communication; import com.alibaba.datax.core.statistics.communication.CommunicationTool; @@ -48,6 +49,7 @@ public class StandAloneJobContainerCommunicator extends AbstractContainerCommuni super.getReporter().reportJobCommunication(super.getJobId(), communication); LOG.info(CommunicationTool.Stringify.getSnapshot(communication)); + EtlJobLogger.log(CommunicationTool.Stringify.getSnapshot(communication)); reportVmInfo(); } diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java index 8b2a8378..480b82c6 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/StdoutPluginCollector.java @@ -2,12 +2,12 @@ package com.alibaba.datax.core.statistics.plugin.task; import com.alibaba.datax.common.constant.PluginType; import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.statistics.communication.Communication; -import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord; +import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.fastjson.JSON; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,10 +63,13 @@ public class StdoutPluginCollector extends AbstractTaskPluginCollector { int logNum = currentLogNum.getAndIncrement(); if(logNum==0 && t!=null){ LOG.error("", t); + EtlJobLogger.log(t); } if (maxLogNum.intValue() < 0 || currentLogNum.intValue() < maxLogNum.intValue()) { LOG.error("脏数据: \n" + this.formatDirty(dirtyRecord, t, errorMessage)); + EtlJobLogger.log("脏数据: \n" + + this.formatDirty(dirtyRecord, t, errorMessage)); } super.collectDirtyRecord(dirtyRecord, t, errorMessage); diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java index c30c94d9..0f5fb854 100755 --- a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java @@ -3,6 +3,7 @@ package com.alibaba.datax.core.taskgroup; import com.alibaba.datax.common.constant.PluginType; import com.alibaba.datax.common.exception.CommonErrorCode; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.statistics.PerfRecord; @@ -131,7 +132,9 @@ public class TaskGroupContainer extends AbstractContainer { LOG.info(String.format( "taskGroupId=[%d] start [%d] channels for [%d] tasks.", this.taskGroupId, channelNumber, taskCountInThisTaskGroup)); - + EtlJobLogger.log(String.format( + "taskGroupId=[%d] start [%d] channels for [%d] tasks.", + this.taskGroupId, channelNumber, taskCountInThisTaskGroup)); this.containerCommunicator.registerCommunication(taskConfigs); Map taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置 @@ -180,6 +183,8 @@ public class TaskGroupContainer extends AbstractContainer { LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms", this.taskGroupId, taskId, usedTime); //usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法 + EtlJobLogger.log("taskGroup[{}] taskId[{}] is successed, used[{}]ms", + this.taskGroupId, taskId, usedTime); PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L); taskStartTimeMap.remove(taskId); taskConfigMap.remove(taskId); @@ -222,6 +227,8 @@ public class TaskGroupContainer extends AbstractContainer { }else{ LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown", this.taskGroupId, taskId, lastExecutor.getAttemptCount()); + EtlJobLogger.log("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown", + this.taskGroupId, taskId, lastExecutor.getAttemptCount()); } } Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig; @@ -238,6 +245,8 @@ public class TaskGroupContainer extends AbstractContainer { taskFailedExecutorMap.remove(taskId); LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started", this.taskGroupId, taskId, attemptCount); + EtlJobLogger.log("taskGroup[{}] taskId[{}] attemptCount[{}] is started", + this.taskGroupId, taskId, attemptCount); } //4.任务列表为空,executor已结束, 搜集状态为success--->成功 @@ -247,6 +256,7 @@ public class TaskGroupContainer extends AbstractContainer { lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId); + EtlJobLogger.log("taskGroup[{}] completed it's tasks.", this.taskGroupId); break; } @@ -290,9 +300,11 @@ public class TaskGroupContainer extends AbstractContainer { if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); + EtlJobLogger.log(vmInfo.totalString()); } LOG.info(PerfTrace.getInstance().summarizeNoException()); + EtlJobLogger.log(PerfTrace.getInstance().summarizeNoException()); } } } diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/runner/ReaderRunner.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/runner/ReaderRunner.java index 91961d8d..270f657b 100755 --- a/core/src/main/java/com/alibaba/datax/core/taskgroup/runner/ReaderRunner.java +++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/runner/ReaderRunner.java @@ -1,5 +1,6 @@ package com.alibaba.datax.core.taskgroup.runner; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.AbstractTaskPlugin; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; @@ -71,6 +72,7 @@ public class ReaderRunner extends AbstractRunner implements Runnable { } catch (Throwable e) { LOG.error("Reader runner Received Exceptions:", e); super.markFail(e); + EtlJobLogger.log(e); } finally { LOG.debug("task reader starts to do destroy ..."); PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY); diff --git a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java index 8d4f1f67..7d926648 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java @@ -1,6 +1,7 @@ package com.alibaba.datax.core.transport.channel; import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.statistics.communication.Communication; import com.alibaba.datax.core.statistics.communication.CommunicationTool; @@ -57,6 +58,8 @@ public abstract class Channel { CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000); if (capacity <= 0) { + EtlJobLogger.log(String.format( + "通道容量[%d]必须大于0.", capacity)); throw new IllegalArgumentException(String.format( "通道容量[%d]必须大于0.", capacity)); } diff --git a/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java b/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java index 20039864..6c792079 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java +++ b/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java @@ -1,6 +1,7 @@ package com.alibaba.datax.core.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.util.container.CoreConstant; import org.apache.commons.io.FileUtils; @@ -55,6 +56,7 @@ public final class ConfigParser { }catch (Exception e){ //吞掉异常,保持log干净。这里message足够。 LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage())); + EtlJobLogger.log(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage())); try { Thread.sleep(1000); } catch (InterruptedException e1) { diff --git a/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java b/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java index 1b469623..f0caa3e8 100644 --- a/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java @@ -1,6 +1,7 @@ package com.alibaba.datax.core.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.transport.transformer.*; import com.alibaba.datax.core.util.container.CoreConstant; @@ -47,6 +48,7 @@ public class TransformerUtil { * 延迟load 第三方插件的function,并按需load */ LOG.info(String.format(" user config tranformers [%s], loading...", functionNames)); + EtlJobLogger.log(String.format(" user config tranformers [%s], loading...", functionNames)); TransformerRegistry.loadTransformerFromLocalStorage(functionNames); int i = 0; @@ -99,6 +101,9 @@ public class TransformerUtil { LOG.info(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s" , i, transformerInfo.getTransformer().getTransformerName() , transformerInfo.isNative(), configuration.getConfiguration("parameter"))); + EtlJobLogger.log(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s" + , i, transformerInfo.getTransformer().getTransformerName() + , transformerInfo.isNative(), configuration.getConfiguration("parameter"))); } return result; diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java index a31f1f6a..72f09fd4 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java @@ -149,6 +149,8 @@ public class CoreConstant { public static final String CURRENT_SERVICE_PASSWORD = "current.service.password"; + public static String LOG_FILE_PATH = "logFilePath"; + // ----------------------------- 环境变量 --------------------------------- /** diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index f3180402..eb77cb77 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -1,13 +1,8 @@ package com.alibaba.datax.plugin.rdbms.reader; -import com.alibaba.datax.common.element.BoolColumn; -import com.alibaba.datax.common.element.BytesColumn; -import com.alibaba.datax.common.element.DateColumn; -import com.alibaba.datax.common.element.DoubleColumn; -import com.alibaba.datax.common.element.LongColumn; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.statistics.PerfRecord; @@ -22,7 +17,6 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.google.common.collect.Lists; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +50,8 @@ public class CommonRdbmsReader { LOG.debug("After job init(), job config now is:[\n{}\n]", originalConfig.toJSON()); + EtlJobLogger.log("After job init(), job config now is:[\n{}\n]", + originalConfig.toJSON()); } public void preCheck(Configuration originalConfig,DataBaseType dataBaseType) { @@ -158,9 +154,11 @@ public class CommonRdbmsReader { DBUtilErrorCode.JDBC_OB10_ADDRESS_ERROR, "JDBC OB10格式错误,请联系askdatax"); } LOG.info("this is ob1_0 jdbc url."); + EtlJobLogger.log("this is ob1_0 jdbc url."); this.username = ss[1].trim() +":"+this.username; this.jdbcUrl = ss[2]; LOG.info("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl); + EtlJobLogger.log("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl); } this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); @@ -179,6 +177,8 @@ public class CommonRdbmsReader { LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, basicMsg); + EtlJobLogger.log("Begin to read record by Sql: [{}\n] {}.", + querySql, basicMsg); PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY); queryPerfRecord.start(); @@ -215,6 +215,8 @@ public class CommonRdbmsReader { //目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间 LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, basicMsg); + EtlJobLogger.log("Finished read record by Sql: [{}\n] {}.", + querySql, basicMsg); }catch (Exception e) { throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); @@ -340,6 +342,7 @@ public class CommonRdbmsReader { LOG.debug("read data " + record.toString() + " occur exception:", e); } + EtlJobLogger.log(e); //TODO 这里识别为脏数据靠谱吗? taskPluginCollector.collectDirtyRecord(record, e); if (e instanceof DataXException) { diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/ResultSetReadProxy.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/ResultSetReadProxy.java index 9fe765c6..b51ecbcf 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/ResultSetReadProxy.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/ResultSetReadProxy.java @@ -2,10 +2,10 @@ package com.alibaba.datax.plugin.rdbms.reader; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +126,7 @@ public class ResultSetReadProxy { LOG.debug("read data " + record.toString() + " occur exception:", e); } + EtlJobLogger.log(e); //TODO 这里识别为脏数据靠谱吗? taskPluginCollector.collectDirtyRecord(record, e); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/HintUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/HintUtil.java index 4e6827cf..75afe5b0 100644 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/HintUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/HintUtil.java @@ -1,5 +1,6 @@ package com.alibaba.datax.plugin.rdbms.reader.util; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; @@ -52,14 +53,17 @@ public class HintUtil { //主库不并发读取 if(finalHint.indexOf("parallel") > 0 && DBUtil.isOracleMaster(jdbcUrl, username, password)){ LOG.info("master:{} will not use hint:{}", jdbcUrl, finalHint); + EtlJobLogger.log("master:{} will not use hint:{}", jdbcUrl, finalHint); }else{ LOG.info("table:{} use hint:{}.", table, finalHint); + EtlJobLogger.log("table:{} use hint:{}.", table, finalHint); return finalHint + column; } } } } catch (Exception e){ LOG.warn("match hint exception, will not use hint", e); + EtlJobLogger.log(e); } return column; } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java index 3ac5f2af..e4b8ef63 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.rdbms.reader.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.ListUtil; import com.alibaba.datax.plugin.rdbms.reader.Constant; @@ -100,6 +101,7 @@ public final class OriginalConfPretreatmentUtil { i, Key.JDBC_URL), jdbcUrl); LOG.info("Available jdbcUrl:{}.",jdbcUrl); + EtlJobLogger.log("Available jdbcUrl:{}.", jdbcUrl); if (isTableMode) { // table 方式 @@ -144,6 +146,7 @@ public final class OriginalConfPretreatmentUtil { if (1 == userConfiguredColumns.size() && "*".equals(userConfiguredColumns.get(0))) { LOG.warn("您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改."); + EtlJobLogger.log("您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改."); // 回填其值,需要以 String 的方式转交后续处理 originalConfig.set(Key.COLUMN, "*"); } else { @@ -161,6 +164,8 @@ public final class OriginalConfPretreatmentUtil { tableName); LOG.info("table:[{}] has columns:[{}].", tableName, StringUtils.join(allColumns, ",")); + EtlJobLogger.log("table:[{}] has columns:[{}].", + tableName, StringUtils.join(allColumns, ",")); // warn:注意mysql表名区分大小写 allColumns = ListUtil.valueToLowerCase(allColumns); List quotedColumns = new ArrayList(); @@ -203,6 +208,7 @@ public final class OriginalConfPretreatmentUtil { if (null != userConfiguredColumns && userConfiguredColumns.size() > 0) { LOG.warn("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 column. 如果您不想看到这条提醒,请移除您源头表中配置中的 column."); + EtlJobLogger.log("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 column. 如果您不想看到这条提醒,请移除您源头表中配置中的 column."); originalConfig.remove(Key.COLUMN); } @@ -210,6 +216,7 @@ public final class OriginalConfPretreatmentUtil { String where = originalConfig.getString(Key.WHERE, null); if (StringUtils.isNotBlank(where)) { LOG.warn("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 where. 如果您不想看到这条提醒,请移除您源头表中配置中的 where."); + EtlJobLogger.log("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 where. 如果您不想看到这条提醒,请移除您源头表中配置中的 where."); originalConfig.remove(Key.WHERE); } @@ -217,6 +224,7 @@ public final class OriginalConfPretreatmentUtil { String splitPk = originalConfig.getString(Key.SPLIT_PK, null); if (StringUtils.isNotBlank(splitPk)) { LOG.warn("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 splitPk. 如果您不想看到这条提醒,请移除您源头表中配置中的 splitPk."); + EtlJobLogger.log("您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 splitPk. 如果您不想看到这条提醒,请移除您源头表中配置中的 splitPk."); originalConfig.remove(Key.SPLIT_PK); } } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java index d9846b39..150906ee 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java @@ -1,12 +1,12 @@ package com.alibaba.datax.plugin.rdbms.reader.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.util.*; import com.alibaba.fastjson.JSON; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -116,6 +116,8 @@ public class SingleTableSplitUtil { LOG.info("After split(), allQuerySql=[\n{}\n].", StringUtils.join(allQuerySql, "\n")); + EtlJobLogger.log("After split(), allQuerySql=[\n{}\n].", + StringUtils.join(allQuerySql, "\n")); tempConfig.set(Key.QUERY_SQL, tempQuerySql); pluginParams.add(tempConfig); @@ -171,6 +173,7 @@ public class SingleTableSplitUtil { private static Pair checkSplitPk(Connection conn, String pkRangeSQL, int fetchSize, String table, String username, Configuration configuration) { LOG.info("split pk [sql={}] is running... ", pkRangeSQL); + EtlJobLogger.log("split pk [sql={}] is running... ", pkRangeSQL); ResultSet rs = null; Pair minMaxPK = null; try { @@ -318,6 +321,7 @@ public class SingleTableSplitUtil { Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcURL, username, password); LOG.info("split pk [sql={}] is running... ", splitSql); + EtlJobLogger.log("split pk [sql={}] is running... ", splitSql); ResultSet rs = null; List> splitedRange = new ArrayList>(); try { @@ -347,6 +351,7 @@ public class SingleTableSplitUtil { DBUtil.closeDBResources(rs, null, null); } LOG.debug(JSON.toJSONString(splitedRange)); + EtlJobLogger.log(JSON.toJSONString(splitedRange)); List rangeSql = new ArrayList(); int splitedRangeSize = splitedRange.size(); // warn: splitedRangeSize may be 0 or 1,切分规则为IS NULL以及 IS NOT NULL diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java index 63d1621b..09e42592 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java @@ -1,13 +1,13 @@ package com.alibaba.datax.plugin.rdbms.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.RetryUtil; import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.druid.sql.parser.SQLParserUtils; import com.alibaba.druid.sql.parser.SQLStatementParser; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; @@ -151,13 +151,16 @@ public final class DBUtil { } } else { LOG.warn("SHOW SLAVE STATUS has no result"); + EtlJobLogger.log("SHOW SLAVE STATUS has no result"); } } } else { LOG.warn("SHOW VARIABLES like 'read_only' has no result"); + EtlJobLogger.log("SHOW VARIABLES like 'read_only' has no result"); } } catch (Exception e) { LOG.warn("checkSlave failed, errorMessage:[{}].", e.getMessage()); + EtlJobLogger.log("checkSlave failed, errorMessage:[{}].", e.getMessage()); } return false; } @@ -208,6 +211,7 @@ public final class DBUtil { } } catch (Exception e) { LOG.warn("Check the database has the Insert Privilege failed, errorMessage:[{}]", e.getMessage()); + EtlJobLogger.log("Check the database has the Insert Privilege failed, errorMessage:[{}]", e.getMessage()); } if (tableNames.isEmpty()) return true; @@ -231,10 +235,12 @@ public final class DBUtil { if(e.getMessage() != null && e.getMessage().contains("insufficient privileges")) { hasInsertPrivilege = false; LOG.warn("User [" + userName +"] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage()); + EtlJobLogger.log("User [" + userName + "] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage()); } } else { hasInsertPrivilege = false; LOG.warn("User [" + userName + "] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage()); + EtlJobLogger.log("User [" + userName + "] has no 'insert' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage()); } } } @@ -242,6 +248,7 @@ public final class DBUtil { connection.close(); } catch (SQLException e) { LOG.warn("connection close failed, " + e.getMessage()); + EtlJobLogger.log("connection close failed, " + e.getMessage()); } return hasInsertPrivilege; } @@ -260,12 +267,14 @@ public final class DBUtil { } catch (Exception e) { hasInsertPrivilege = false; LOG.warn("User [" + userName +"] has no 'delete' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage()); + EtlJobLogger.log("User [" + userName + "] has no 'delete' privilege on table[" + tableName + "], errorMessage:[{}]", e.getMessage()); } } try { connection.close(); } catch (SQLException e) { LOG.warn("connection close failed, " + e.getMessage()); + EtlJobLogger.log("connection close failed, " + e.getMessage()); } return hasInsertPrivilege; } @@ -366,9 +375,11 @@ public final class DBUtil { DBUtilErrorCode.JDBC_OB10_ADDRESS_ERROR, "JDBC OB10格式错误,请联系askdatax"); } LOG.info("this is ob1_0 jdbc url."); + EtlJobLogger.log("this is ob1_0 jdbc url."); user = ss[1].trim() +":"+user; url = ss[2]; LOG.info("this is ob1_0 jdbc url. user="+user+" :url="+url); + EtlJobLogger.log("this is ob1_0 jdbc url. user=" + user + " :url=" + url); } Properties prop = new Properties(); @@ -598,6 +609,8 @@ public final class DBUtil { } catch (Exception e) { LOG.warn("test connection of [{}] failed, for {}.", url, e.getMessage()); + EtlJobLogger.log("test connection of [{}] failed, for {}.", url, + e.getMessage()); } finally { DBUtil.closeDBResources(null, connection); } @@ -613,6 +626,7 @@ public final class DBUtil { for (String pre : preSql) { if (doPreCheck(connection, pre) == false) { LOG.warn("doPreCheck failed."); + EtlJobLogger.log("doPreCheck failed."); return false; } } @@ -621,6 +635,8 @@ public final class DBUtil { } catch (Exception e) { LOG.warn("test connection of [{}] failed, for {}.", url, e.getMessage()); + EtlJobLogger.log("test connection of [{}] failed, for {}.", url, + e.getMessage()); } finally { DBUtil.closeDBResources(null, connection); } @@ -675,6 +691,9 @@ public final class DBUtil { LOG.warn( "pre check failed. It should return one result:0, pre:[{}].", pre); + EtlJobLogger.log( + "pre check failed. It should return one result:0, pre:[{}].", + pre); return false; } @@ -687,9 +706,14 @@ public final class DBUtil { LOG.warn( "pre check failed. It should return one result:0, pre:[{}].", pre); + EtlJobLogger.log( + "pre check failed. It should return one result:0, pre:[{}].", + pre); } catch (Exception e) { LOG.warn("pre check failed. pre:[{}], errorMessage:[{}].", pre, e.getMessage()); + EtlJobLogger.log("pre check failed. pre:[{}], errorMessage:[{}].", pre, + e.getMessage()); } finally { DBUtil.closeResultSet(rs); } @@ -740,6 +764,7 @@ public final class DBUtil { for (String sessionSql : sessions) { LOG.info("execute sql:[{}]", sessionSql); + EtlJobLogger.log("execute sql:[{}]", sessionSql); try { DBUtil.executeSqlWithoutResultSet(stmt, sessionSql); } catch (SQLException e) { diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index 440aac2a..20a35b13 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.rdbms.writer; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; @@ -42,6 +43,8 @@ public class CommonRdbmsWriter { LOG.debug("After job init(), originalConfig now is:[\n{}\n]", originalConfig.toJSON()); + EtlJobLogger.log("After job init(), originalConfig now is:[\n{}\n]", + originalConfig.toJSON()); } /*目前只支持MySQL Writer跟Oracle Writer;检查PreSQL跟PostSQL语法以及insert,delete权限*/ @@ -117,6 +120,8 @@ public class CommonRdbmsWriter { jdbcUrl, username, password); LOG.info("Begin to execute preSqls:[{}]. context info:{}.", StringUtils.join(renderedPreSqls, ";"), jdbcUrl); + EtlJobLogger.log("Begin to execute preSqls:[{}]. context info:{}.", + StringUtils.join(renderedPreSqls, ";"), jdbcUrl); WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, dataBaseType); DBUtil.closeDBResources(null, null, conn); @@ -125,6 +130,8 @@ public class CommonRdbmsWriter { LOG.debug("After job prepare(), originalConfig now is:[\n{}\n]", originalConfig.toJSON()); + EtlJobLogger.log("After job prepare(), originalConfig now is:[\n{}\n]", + originalConfig.toJSON()); } public List split(Configuration originalConfig, @@ -159,6 +166,9 @@ public class CommonRdbmsWriter { LOG.info( "Begin to execute postSqls:[{}]. context info:{}.", StringUtils.join(renderedPostSqls, ";"), jdbcUrl); + EtlJobLogger.log( + "Begin to execute postSqls:[{}]. context info:{}.", + StringUtils.join(renderedPostSqls, ";"), jdbcUrl); WriterUtil.executeSqls(conn, renderedPostSqls, jdbcUrl, dataBaseType); DBUtil.closeDBResources(null, null, conn); } @@ -217,9 +227,11 @@ public class CommonRdbmsWriter { DBUtilErrorCode.JDBC_OB10_ADDRESS_ERROR, "JDBC OB10格式错误,请联系askdatax"); } LOG.info("this is ob1_0 jdbc url."); + EtlJobLogger.log("this is ob1_0 jdbc url."); this.username = ss[1].trim() + ":" + this.username; this.jdbcUrl = ss[2]; LOG.info("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl); + EtlJobLogger.log("this is ob1_0 jdbc url. user=" + this.username + " :url=" + this.jdbcUrl); } this.table = writerSliceConfig.getString(Key.TABLE); @@ -253,6 +265,8 @@ public class CommonRdbmsWriter { if (tableNumber != 1) { LOG.info("Begin to execute preSqls:[{}]. context info:{}.", StringUtils.join(this.preSqls, ";"), BASIC_MESSAGE); + EtlJobLogger.log("Begin to execute preSqls:[{}]. context info:{}.", + StringUtils.join(this.preSqls, ";"), BASIC_MESSAGE); WriterUtil.executeSqls(connection, this.preSqls, BASIC_MESSAGE, dataBaseType); } @@ -334,6 +348,8 @@ public class CommonRdbmsWriter { LOG.info("Begin to execute postSqls:[{}]. context info:{}.", StringUtils.join(this.postSqls, ";"), BASIC_MESSAGE); + EtlJobLogger.log("Begin to execute postSqls:[{}]. context info:{}.", + StringUtils.join(this.postSqls, ";"), BASIC_MESSAGE); WriterUtil.executeSqls(connection, this.postSqls, BASIC_MESSAGE, dataBaseType); DBUtil.closeDBResources(null, null, connection); } @@ -358,6 +374,7 @@ public class CommonRdbmsWriter { connection.commit(); } catch (SQLException e) { LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage()); + EtlJobLogger.log("回滚此次写入, 采用每次写入一行方式提交. 因为:" + e.getMessage()); connection.rollback(); doOneInsert(connection, buffer); } catch (Exception e) { @@ -382,7 +399,7 @@ public class CommonRdbmsWriter { preparedStatement.execute(); } catch (SQLException e) { LOG.debug(e.toString()); - + EtlJobLogger.log(e.toString()); this.taskPluginCollector.collectDirtyRecord(record, e); } finally { // 最后不要忘了关闭 preparedStatement diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java index c42dd3ea..43df03d7 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.rdbms.writer.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.ListUtil; import com.alibaba.datax.plugin.rdbms.util.*; @@ -107,9 +108,12 @@ public final class OriginalConfPretreatmentUtil { LOG.info("table:[{}] all columns:[\n{}\n].", oneTable, StringUtils.join(allColumns, ",")); + EtlJobLogger.log("table:[{}] all columns:[\n{}\n].", oneTable, + StringUtils.join(allColumns, ",")); if (1 == userConfiguredColumns.size() && "*".equals(userConfiguredColumns.get(0))) { LOG.warn("您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改."); + EtlJobLogger.log("您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改."); // 回填其值,需要以 String 的方式转交后续处理 originalConfig.set(Key.COLUMN, allColumns); @@ -163,6 +167,7 @@ public final class OriginalConfPretreatmentUtil { String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate); LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + EtlJobLogger.log("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java index 5f5f0d51..e33d29ce 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.rdbms.writer.util; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; @@ -15,7 +16,9 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.Statement; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; public final class WriterUtil { private static final Logger LOG = LoggerFactory.getLogger(WriterUtil.class); @@ -180,6 +183,8 @@ public final class WriterUtil { if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { LOG.info("Begin to preCheck preSqls:[{}].", StringUtils.join(renderedPreSqls, ";")); + EtlJobLogger.log("Begin to preCheck preSqls:[{}].", + StringUtils.join(renderedPreSqls, ";")); for(String sql : renderedPreSqls) { try{ DBUtil.sqlValid(sql, type); @@ -203,6 +208,8 @@ public final class WriterUtil { LOG.info("Begin to preCheck postSqls:[{}].", StringUtils.join(renderedPostSqls, ";")); + EtlJobLogger.log("Begin to preCheck postSqls:[{}].", + StringUtils.join(renderedPostSqls, ";")); for(String sql : renderedPostSqls) { try{ DBUtil.sqlValid(sql, type); diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java index 423f66db..b9dc03c4 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.unstructuredstorage.reader; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; @@ -9,10 +10,11 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.csvreader.CsvReader; -import org.apache.commons.beanutils.BeanUtils; import io.airlift.compress.snappy.SnappyCodec; import io.airlift.compress.snappy.SnappyFramedInputStream; -import org.anarres.lzo.*; +import org.anarres.lzo.LzoDecompressor1x_safe; +import org.anarres.lzo.LzoInputStream; +import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.compress.compressors.CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; @@ -99,6 +101,8 @@ public class UnstructuredStorageReaderUtil { encoding = Constant.DEFAULT_ENCODING; LOG.warn(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding, Constant.DEFAULT_ENCODING)); + EtlJobLogger.log(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding, + Constant.DEFAULT_ENCODING)); } List column = readerSliceConfig @@ -252,6 +256,8 @@ public class UnstructuredStorageReaderUtil { if (null == delimiterInStr) { LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]", Constant.DEFAULT_FIELD_DELIMITER)); + EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]", + Constant.DEFAULT_FIELD_DELIMITER)); } // warn: default value ',', fieldDelimiter could be \n(lineDelimiter) @@ -277,6 +283,8 @@ public class UnstructuredStorageReaderUtil { String fetchLine = reader.readLine(); LOG.info(String.format("Header line %s has been skiped.", fetchLine)); + EtlJobLogger.log(String.format("Header line %s has been skiped.", + fetchLine)); } csvReader = new CsvReader(reader); csvReader.setDelimiter(fieldDelimiter); @@ -329,6 +337,8 @@ public class UnstructuredStorageReaderUtil { if (null == delimiterInStr) { LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]", Constant.DEFAULT_FIELD_DELIMITER)); + EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]", + Constant.DEFAULT_FIELD_DELIMITER)); } // warn: default value ',', fieldDelimiter could be \n(lineDelimiter) // for no fieldDelimiter @@ -388,6 +398,7 @@ public class UnstructuredStorageReaderUtil { sourceLine.length, columnIndex + 1, StringUtils.join(sourceLine, ",")); LOG.warn(message); + EtlJobLogger.log(message); throw new IndexOutOfBoundsException(message); } @@ -463,6 +474,7 @@ public class UnstructuredStorageReaderUtil { String errorMessage = String.format( "您配置的列类型暂不支持 : [%s]", columnType); LOG.error(errorMessage); + EtlJobLogger.log(errorMessage); throw DataXException .asDataXException( UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE, @@ -632,6 +644,7 @@ public class UnstructuredStorageReaderUtil { UnstructuredStorageReaderUtil.csvReaderConfigMap = JSON.parseObject(csvReaderConfig, new TypeReference>() {}); }catch (Exception e) { LOG.info(String.format("WARN!!!!忽略csvReaderConfig配置! 配置错误,值只能为空或者为Map结构,您配置的值为: %s", csvReaderConfig)); + EtlJobLogger.log(String.format("WARN!!!!忽略csvReaderConfig配置! 配置错误,值只能为空或者为Map结构,您配置的值为: %s", csvReaderConfig)); } } } @@ -685,14 +698,18 @@ public class UnstructuredStorageReaderUtil { try { BeanUtils.populate(csvReader,UnstructuredStorageReaderUtil.csvReaderConfigMap); LOG.info(String.format("csvReaderConfig设置成功,设置后CsvReader:%s", JSON.toJSONString(csvReader))); + EtlJobLogger.log(String.format("csvReaderConfig设置成功,设置后CsvReader:%s", JSON.toJSONString(csvReader))); } catch (Exception e) { LOG.info(String.format("WARN!!!!忽略csvReaderConfig配置!通过BeanUtils.populate配置您的csvReaderConfig发生异常,您配置的值为: %s;请检查您的配置!CsvReader使用默认值[%s]", JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap),JSON.toJSONString(csvReader))); + EtlJobLogger.log(String.format("WARN!!!!忽略csvReaderConfig配置!通过BeanUtils.populate配置您的csvReaderConfig发生异常,您配置的值为: %s;请检查您的配置!CsvReader使用默认值[%s]", + JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap), JSON.toJSONString(csvReader))); } }else { //默认关闭安全模式, 放开10W字节的限制 csvReader.setSafetySwitch(false); LOG.info(String.format("CsvReader使用默认值[%s],csvReaderConfig值为[%s]",JSON.toJSONString(csvReader),JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap))); + EtlJobLogger.log(String.format("CsvReader使用默认值[%s],csvReaderConfig值为[%s]", JSON.toJSONString(csvReader), JSON.toJSONString(UnstructuredStorageReaderUtil.csvReaderConfigMap))); } } } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/TextCsvWriterManager.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/TextCsvWriterManager.java index 1ea82759..45b9c21a 100644 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/TextCsvWriterManager.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/TextCsvWriterManager.java @@ -1,15 +1,15 @@ package com.alibaba.datax.plugin.unstructuredstorage.writer; -import java.io.IOException; -import java.io.Writer; -import java.util.List; - +import com.alibaba.datax.common.log.EtlJobLogger; +import com.csvreader.CsvWriter; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.csvreader.CsvWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.List; public class TextCsvWriterManager { public static UnstructuredWriter produceUnstructuredWriter( @@ -43,6 +43,7 @@ class CsvWriterImpl implements UnstructuredWriter { public void writeOneRecord(List splitedRows) throws IOException { if (splitedRows.isEmpty()) { LOG.info("Found one record line which is empty."); + EtlJobLogger.log("Found one record line which is empty."); } this.csvWriter.writeRecord((String[]) splitedRows .toArray(new String[0])); @@ -76,6 +77,7 @@ class TextWriterImpl implements UnstructuredWriter { public void writeOneRecord(List splitedRows) throws IOException { if (splitedRows.isEmpty()) { LOG.info("Found one record line which is empty."); + EtlJobLogger.log("Found one record line which is empty."); } this.textWriter.write(String.format("%s%s", StringUtils.join(splitedRows, this.fieldDelimiter), diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java index b1927ce7..c792c429 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java @@ -1,18 +1,14 @@ package com.alibaba.datax.plugin.unstructuredstorage.writer; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.UnsupportedEncodingException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DateColumn; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.log.EtlJobLogger; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.google.common.collect.Sets; import org.apache.commons.compress.compressors.CompressorOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; @@ -22,14 +18,10 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.DateColumn; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.plugin.RecordReceiver; -import com.alibaba.datax.common.plugin.TaskPluginCollector; -import com.alibaba.datax.common.util.Configuration; -import com.google.common.collect.Sets; +import java.io.*; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; public class UnstructuredStorageWriterUtil { private UnstructuredStorageWriterUtil() { @@ -66,6 +58,8 @@ public class UnstructuredStorageWriterUtil { // like " ", null LOG.warn(String.format("您的encoding配置为空, 将使用默认值[%s]", Constant.DEFAULT_ENCODING)); + EtlJobLogger.log(String.format("您的encoding配置为空, 将使用默认值[%s]", + Constant.DEFAULT_ENCODING)); writerConfiguration.set(Key.ENCODING, Constant.DEFAULT_ENCODING); } else { try { @@ -107,6 +101,8 @@ public class UnstructuredStorageWriterUtil { if (null == delimiterInStr) { LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]", Constant.DEFAULT_FIELD_DELIMITER)); + EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]", + Constant.DEFAULT_FIELD_DELIMITER)); writerConfiguration.set(Key.FIELD_DELIMITER, Constant.DEFAULT_FIELD_DELIMITER); } @@ -126,6 +122,7 @@ public class UnstructuredStorageWriterUtil { public static List split(Configuration writerSliceConfig, Set originAllFileExists, int mandatoryNumber) { LOG.info("begin do split..."); + EtlJobLogger.log("begin do split..."); Set allFileExists = new HashSet(); allFileExists.addAll(originAllFileExists); List writerSplitConfigs = new ArrayList(); @@ -146,9 +143,12 @@ public class UnstructuredStorageWriterUtil { splitedTaskConfig.set(Key.FILE_NAME, fullFileName); LOG.info(String .format("splited write file name:[%s]", fullFileName)); + EtlJobLogger.log(String + .format("splited write file name:[%s]", fullFileName)); writerSplitConfigs.add(splitedTaskConfig); } LOG.info("end do split."); + EtlJobLogger.log("end do split."); return writerSplitConfigs; } @@ -187,6 +187,8 @@ public class UnstructuredStorageWriterUtil { if (StringUtils.isBlank(encoding)) { LOG.warn(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding, Constant.DEFAULT_ENCODING)); + EtlJobLogger.log(String.format("您配置的encoding为[%s], 使用默认值[%s]", encoding, + Constant.DEFAULT_ENCODING)); encoding = Constant.DEFAULT_ENCODING; } String compress = config.getString(Key.COMPRESS); @@ -264,6 +266,8 @@ public class UnstructuredStorageWriterUtil { if (null == delimiterInStr) { LOG.warn(String.format("您没有配置列分隔符, 使用默认值[%s]", Constant.DEFAULT_FIELD_DELIMITER)); + EtlJobLogger.log(String.format("您没有配置列分隔符, 使用默认值[%s]", + Constant.DEFAULT_FIELD_DELIMITER)); } // warn: fieldDelimiter could not be '' for no fieldDelimiter