diff --git a/core/src/main/job/mongodb2tdengine.json b/core/src/main/job/mongodb2tdengine.json index 4cfc987e..45e5a640 100644 --- a/core/src/main/job/mongodb2tdengine.json +++ b/core/src/main/job/mongodb2tdengine.json @@ -11,7 +11,7 @@ "name": "mongodbreader", "parameter": { "address": [ - "123.56.104.14:27017" + "127.0.0.1:27017" ], "userName": "admin678", "mechanism": "SCRAM-SHA-1", @@ -50,7 +50,7 @@ "writer": { "name": "tdenginewriter", "parameter": { - "host": "123.56.104.14", + "host": "127.0.0.1", "port": 6030, "dbname": "test", "user": "root", diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md index c9c222a2..9ab64a2d 100644 --- a/tdenginewriter/doc/tdenginewriter.md +++ b/tdenginewriter/doc/tdenginewriter.md @@ -290,6 +290,9 @@ TAGS( ## 5 约束限制 +1. 本插件自动创建超级表时NCHAR类型的长度固定为64,对于包含长度大于64的字符串的数据源,将不支持。 +2. 标签列不能包含null值,如果包含会被过滤掉。 + ## FAQ ### 如何选取要同步的数据的范围? @@ -300,10 +303,14 @@ TAGS( 如果Reader插件支持一次读多张表,Writer插件就能一次导入多张表。如果Reader不支持多多张表,可以建多个job,分别导入。Writer插件只负责写数据。 -### 1张源表导入之后对应TDengine中多少张表? +### 一张源表导入之后对应TDengine中多少张表? 这是又tagColumn决定的,如果所有tag列的值都相同,目标表也只有一个。源表有多少不同的tag组合,目标超表就会有多少子表。 ### 源表和目标表的字段顺序一致吗? -TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 \ No newline at end of file +TDengine要求每个表第一列是时间戳列,后边是普通字段,最后是标签列。如果源表不是这个顺序,插件在自动建表是自动调整。 + +### 插件如何确定各列的数据类型? + +抽样收到的第一批数据自动推断各列的类型。schema是从数据来的,因此要保障“好的”数据占大多数。 \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java index 686ac27b..421c2fe4 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java @@ -1,10 +1,11 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import java.util.Properties; public interface DataHandler { - long handle(RecordReceiver lineReceiver, Properties properties); + long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector); } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 733f49c5..9250910a 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.taosdata.jdbc.TSDBPreparedStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +17,6 @@ import java.util.Properties; */ public class DefaultDataHandler implements DataHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); - static { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); @@ -26,7 +26,7 @@ public class DefaultDataHandler implements DataHandler { } @Override - public long handle(RecordReceiver lineReceiver, Properties properties) { + public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) { SchemaManager schemaManager = new SchemaManager(properties); if (!schemaManager.configValid()) { return 0; @@ -47,7 +47,11 @@ public class DefaultDataHandler implements DataHandler { } int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000")); - return write(lineReceiver, conn, batchSize, schemaManager); + if (batchSize < 5) { + LOG.error("batchSize太小,会增加自动类型推断错误的概率,建议改大后重试"); + return 0; + } + return write(lineReceiver, conn, batchSize, schemaManager, collector); } catch (Exception e) { LOG.error("write failed " + e.getMessage()); e.printStackTrace(); @@ -79,18 +83,15 @@ public class DefaultDataHandler implements DataHandler { * @return 成功写入记录数 * @throws SQLException */ - private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm) throws SQLException { + private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException { Record record = lineReceiver.getFromReader(); if (record == null) { return 0; } - if (scm.shouldCreateTable()) { - scm.createSTable(conn, record); - } String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder()); LOG.info("Prepared SQL: {}", pq); try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) { - JDBCBatchWriter batchWriter = new JDBCBatchWriter(stmt, scm, batchSize); + JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector); do { batchWriter.append(record); } while ((record = lineReceiver.getFromReader()) != null); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java index 17023d03..21974e93 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java @@ -3,10 +3,13 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.AbstractTaskPlugin; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.taosdata.jdbc.TSDBPreparedStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -26,10 +29,12 @@ import java.util.stream.Collectors; */ public class JDBCBatchWriter { public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class); - private TSDBPreparedStatement stmt; private SchemaManager scm; + private Connection conn; private int batchSize; + private TaskPluginCollector collector; + // 缓存Record, key为tableName Map> buf = new HashMap<>(); // 缓存表的标签值, key为tableName @@ -37,25 +42,57 @@ public class JDBCBatchWriter { private long sucCount = 0; private final int tsColIndex; private List fieldList; + // 每个record至少应该包含的列数,用于检测数据 + private int minColNum = 0; private Map fieldIndexMap; + private List fieldTypes = null; - public JDBCBatchWriter(TSDBPreparedStatement stmt, SchemaManager scm, int batchSize) { + public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) { + this.conn = conn; this.stmt = stmt; this.scm = scm; this.batchSize = batchSize; + this.collector = collector; this.tsColIndex = scm.getTsColIndex(); this.fieldList = scm.getFieldList(); this.fieldIndexMap = scm.getFieldIndexMap(); + this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount(); + } + public void initFiledTypesAndTargetTable(List records) throws SQLException { + if (fieldTypes != null) { + return; + } + guessFieldTypes(records); + if (scm.shouldCreateTable()) { + scm.createSTable(conn, fieldTypes); + } + } public void append(Record record) throws SQLException { + int columnNum = record.getColumnNumber(); + if (columnNum < minColNum) { + collector.collectDirtyRecord(record, "实际列数小于期望列数"); + return; + } String[] tagValues = scm.getTagValuesFromRecord(record); + if (tagValues == null) { + collector.collectDirtyRecord(record, "标签列包含null"); + return; + } + if (!scm.hasTimestamp(record)) { + collector.collectDirtyRecord(record, "时间戳列为null或类型错误"); + return; + } String tableName = scm.computeTableName(tagValues); if (buf.containsKey(tableName)) { List lis = buf.get(tableName); lis.add(record); if (lis.size() == batchSize) { + if (fieldTypes == null) { + initFiledTypesAndTargetTable(lis); + } executeBatch(tableName); lis.clear(); } @@ -67,6 +104,49 @@ public class JDBCBatchWriter { } } + /** + * 只有String类型比较特别,测试发现值为null的列会转成String类型。所以Column的类型为String并不代表这一列的类型真的是String。 + * + * @param records + */ + private void guessFieldTypes(List records) { + fieldTypes = new ArrayList<>(fieldList.size()); + for (int i = 0; i < fieldList.size(); ++i) { + int colIndex = fieldIndexMap.get(fieldList.get(i)); + boolean ok = false; + for (int j = 0; j < records.size() && !ok; ++j) { + Column column = records.get(j).getColumn(colIndex); + Column.Type type = column.getType(); + switch (type) { + case LONG: + case DOUBLE: + case DATE: + case BOOL: + case BYTES: + if (column.getRawData() != null) { + fieldTypes.add(type); + ok = true; + } + break; + case STRING: + // 只有非null且非空的String列,才会被真的当作String类型。 + String value = column.asString(); + if (value != null && !"".equals(value)) { + fieldTypes.add(type); + ok = true; + } + break; + default: + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString()); + } + } + if (!ok) { + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format("根据采样的%d条数据,无法推断第%d列的数据类型", records.size(), i + 1)); + } + } + LOG.info("Field Types: {}", fieldTypes); + } + /** * 执行单表批量写入 * @@ -87,12 +167,10 @@ public class JDBCBatchWriter { ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new)); stmt.setTimestamp(0, tsList); // 字段 - Record record = records.get(0); for (int i = 0; i < fieldList.size(); ) { String fieldName = fieldList.get(i); int index = fieldIndexMap.get(fieldName); - Column column = record.getColumn(index); - switch (column.getType()) { + switch (fieldTypes.get(i)) { case LONG: ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new)); stmt.setLong(++i, lisLong); @@ -118,7 +196,7 @@ public class JDBCBatchWriter { stmt.setString(++i, lisBytes, 64); break; default: - throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, column.getType().toString()); + throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString()); } } // 执行 @@ -132,6 +210,16 @@ public class JDBCBatchWriter { * 把缓存的Record全部写入 */ public void flush() throws SQLException { + if (fieldTypes == null) { + List records = new ArrayList<>(); + for (List lis : buf.values()) { + records.addAll(lis); + if (records.size() > 100) { + break; + } + } + initFiledTypesAndTargetTable(records); + } for (String tabName : buf.keySet()) { if (buf.get(tabName).size() > 0) { executeBatch(tabName); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java index 52f1aa7a..e1b8f5dd 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java @@ -4,6 +4,7 @@ import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +15,7 @@ public class OpentsdbDataHandler implements DataHandler { private static final String DEFAULT_BATCH_SIZE = "1"; @Override - public long handle(RecordReceiver lineReceiver, Properties properties) { + public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) { // opentsdb json protocol use JNI and schemaless API to write String host = properties.getProperty(Key.HOST); int port = Integer.parseInt(properties.getProperty(Key.PORT)); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index b3d7b7e3..21b8ef01 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -176,14 +176,15 @@ public class SchemaManager { return stables; } - public void createSTable(Connection conn, Record record) throws SQLException { + public void createSTable(Connection conn, List fieldTypes) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("("); sb.append(tsColName).append(" ").append("TIMESTAMP,"); - for (String fieldName : fieldList) { + for (int i = 0; i < fieldList.size(); ++i) { + String fieldName = fieldList.get(i); + Column.Type dxType = fieldTypes.get(i); sb.append(fieldName).append(' '); - Column col = record.getColumn(fieldIndexMap.get(fieldName)); - String tdType = mapDataxType(col.getType()); + String tdType = mapDataxType(dxType); sb.append(tdType).append(','); } sb.deleteCharAt(sb.length() - 1); @@ -209,10 +210,22 @@ public class SchemaManager { int tagIndex = tagIndexMap.get(tagList.get(i)); tagValues[i] = record.getColumn(tagIndex).asString(); } + if (tagValues[i] == null) { + return null; + } } return tagValues; } + public boolean hasTimestamp(Record record) { + Column column = record.getColumn(tsColIndex); + if (column.getType() == Column.Type.DATE && column.asDate() != null) { + return true; + } else { + return false; + } + } + public Map getFieldIndexMap() { return fieldIndexMap; } @@ -252,4 +265,8 @@ public class SchemaManager { String s = String.join("!", tagValues); return "t_" + DigestUtils.md5Hex(s); } + + public int getDynamicTagCount() { + return tagIndexMap.size(); + } } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java index 70ea5737..cd223792 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -74,7 +74,7 @@ public class TDengineWriter extends Writer { String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); LOG.debug("start to handle record from: " + peerPluginName); DataHandler handler = DataHandlerFactory.build(peerPluginName); - long records = handler.handle(lineReceiver, properties); + long records = handler.handle(lineReceiver, properties, getTaskPluginCollector()); LOG.debug("handle data finished, records: " + records); } diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java index 43928db9..62bf7040 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterTest.java @@ -5,6 +5,7 @@ import org.junit.Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.sql.Statement; public class TDengineWriterTest { @@ -18,4 +19,13 @@ public class TDengineWriterTest { schemaManager.setStable("test1"); schemaManager.getFromDB(conn); } + + @Test + public void dropTestTable() throws ClassNotFoundException, SQLException { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata"); + Connection conn = DriverManager.getConnection(jdbcUrl); + Statement stmt = conn.createStatement(); + stmt.execute("drop table market_snapshot"); + } }