From 4d75eaabfa15400bf56c9092be78a3e347f504c5 Mon Sep 17 00:00:00 2001 From: walkerJing <1058304821@qq.com> Date: Sat, 14 Mar 2020 23:22:40 +0800 Subject: [PATCH] =?UTF-8?q?json=E6=95=B0=E6=8D=AE=E6=BA=90=E6=9E=84?= =?UTF-8?q?=E5=BB=BA=E6=94=AF=E6=8C=81mongodb?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/DatasourceQueryController.java | 25 + .../wugui/datax/admin/dto/DataxJsonDto.java | 8 +- .../datax/admin/dto/MongoDBReaderDto.java | 24 + .../datax/admin/dto/MongoDBWriterDto.java | 32 + .../admin/service/DatasourceQueryService.java | 16 +- .../impl/DatasourceQueryServiceImpl.java | 40 +- .../impl/JobDatasourceServiceImpl.java | 4 +- .../admin/tool/datax/DataxJsonHelper.java | 148 ++-- .../admin/tool/datax/DataxJsonInterface.java | 10 +- .../tool/datax/DataxPluginInterface.java | 9 + .../tool/datax/reader/BaseReaderPlugin.java | 6 + .../tool/datax/reader/MongoDBReader.java | 32 + .../admin/tool/datax/reader/OracleReader.java | 2 + .../tool/datax/writer/BaseWriterPlugin.java | 10 +- .../admin/tool/datax/writer/HBaseWriter.java | 8 +- .../tool/datax/writer/MongoDBWriter.java | 40 + .../tool/datax/writer/SqlServerlWriter.java | 2 +- .../admin/tool/meta/DatabaseMetaFactory.java | 3 +- .../admin/tool/pojo/DataxMongoDBPojo.java | 45 ++ .../datax/admin/tool/query/BaseQueryTool.java | 8 +- .../admin/tool/query/MongoDBQueryTool.java | 55 +- .../admin/tool/query/QueryToolFactory.java | 29 +- .../datax/admin/util/DataSourceConstants.java | 10 - .../wugui/datax/admin/util/JdbcConstants.java | 119 +++ .../com/wugui/datax/admin/util/JdbcUtils.java | 683 ++++++++++++++++++ .../datax/admin/util/RdbmsException.java | 8 +- datax-admin/src/main/resources/logback.xml | 2 +- 27 files changed, 1265 insertions(+), 113 deletions(-) create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBReaderDto.java create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBWriterDto.java create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/MongoDBReader.java create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/MongoDBWriter.java create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxMongoDBPojo.java delete mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/util/DataSourceConstants.java create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcConstants.java create mode 100644 datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcUtils.java diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/controller/DatasourceQueryController.java b/datax-admin/src/main/java/com/wugui/datax/admin/controller/DatasourceQueryController.java index 51e5cc36..873044a3 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/controller/DatasourceQueryController.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/controller/DatasourceQueryController.java @@ -27,6 +27,31 @@ public class DatasourceQueryController extends ApiController { @Autowired private DatasourceQueryService datasourceQueryService; + /** + * 根据数据源id获取mongo库名 + * + * @param datasourceId + * @return + */ + @GetMapping("/getDBs") + @ApiOperation("根据数据源id获取mongo库名") + public R> getDBs(Long datasourceId) throws IOException { + return success(datasourceQueryService.getDBs(datasourceId)); + } + + + /** + * 根据数据源id,dbname获取CollectionNames + * + * @param datasourceId + * @return + */ + @GetMapping("/collectionNames") + @ApiOperation("根据数据源id,dbname获取CollectionNames") + public R> getTableNames(Long datasourceId,String dbName) throws IOException { + return success(datasourceQueryService.getCollectionNames(datasourceId,dbName)); + } + /** * 根据数据源id获取可用表名 * diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/DataxJsonDto.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/DataxJsonDto.java index deaded07..049e9bfa 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/dto/DataxJsonDto.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/DataxJsonDto.java @@ -10,8 +10,8 @@ import java.util.List; * * @author jingwk * @ClassName DataxJsonDto - * @Version 2.0 - * @since 2020/01/11 17:15 + * @Version 2.1.1 + * @since 2020/03/14 07:15 */ @Data public class DataxJsonDto implements Serializable { @@ -39,4 +39,8 @@ public class DataxJsonDto implements Serializable { private RdbmsReaderDto rdbmsReader; private RdbmsWriterDto rdbmsWriter; + + private MongoDBReaderDto mongoDBReaderDto; + + private MongoDBWriterDto mongoDBWriterDto; } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBReaderDto.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBReaderDto.java new file mode 100644 index 00000000..56575fe4 --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBReaderDto.java @@ -0,0 +1,24 @@ +package com.wugui.datax.admin.dto; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 构建mongodb reader dto + * + * @author jingwk + * @ClassName mongodb reader + * @Version 2.1.1 + * @since 2020/03/14 07:15 + */ +@Data +public class MongoDBReaderDto implements Serializable { + + private String address; + + private String dbName; + + private String collectionName; + +} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBWriterDto.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBWriterDto.java new file mode 100644 index 00000000..b90f6cd6 --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/MongoDBWriterDto.java @@ -0,0 +1,32 @@ +package com.wugui.datax.admin.dto; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 构建mongodb write dto + * + * @author jingwk + * @ClassName mongodb write dto + * @Version 2.1.1 + * @since 2020/03/14 07:15 + */ +@Data +public class MongoDBWriterDto implements Serializable { + + private String address; + + private String dbName; + + private String collectionName; + + /** + * 当设置为true时,表示针对相同的upsertKey做更新操作 + */ + private boolean isUpsert; + /** + * upsertKey指定了没行记录的业务主键。用来做更新时使用。 + */ + private String upsertKey; +} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/service/DatasourceQueryService.java b/datax-admin/src/main/java/com/wugui/datax/admin/service/DatasourceQueryService.java index e0313b0c..62c6bc4a 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/service/DatasourceQueryService.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/service/DatasourceQueryService.java @@ -1,10 +1,11 @@ package com.wugui.datax.admin.service; import java.io.IOException; +import java.net.UnknownHostException; import java.util.List; /** - * 数据库查询服务层 + * 数据库查询服务 * * @author zhouhongfa@gz-yibo.com * @ClassName JdbcDatasourceQueryService @@ -13,6 +14,13 @@ import java.util.List; */ public interface DatasourceQueryService { + /** + * 获取db列表 + * @param id + * @return + */ + List getDBs(Long id) throws UnknownHostException; + /** * 根据数据源表id查询出可用的表 * @@ -21,6 +29,12 @@ public interface DatasourceQueryService { */ List getTables(Long id) throws IOException; + /** + * 获取CollectionNames + * @param dbName + * @return + */ + List getCollectionNames(long id,String dbName) throws UnknownHostException; /** * 根据数据源id,表名查询出该表所有字段 diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/DatasourceQueryServiceImpl.java b/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/DatasourceQueryServiceImpl.java index e5957df8..a0850042 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/DatasourceQueryServiceImpl.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/DatasourceQueryServiceImpl.java @@ -7,16 +7,18 @@ import com.wugui.datax.admin.service.DatasourceQueryService; import com.wugui.datax.admin.service.JobDatasourceService; import com.wugui.datax.admin.tool.query.BaseQueryTool; import com.wugui.datax.admin.tool.query.HBaseQueryTool; +import com.wugui.datax.admin.tool.query.MongoDBQueryTool; import com.wugui.datax.admin.tool.query.QueryToolFactory; -import com.wugui.datax.admin.util.DataSourceConstants; +import com.wugui.datax.admin.util.JdbcConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.net.UnknownHostException; import java.util.List; /** - * TODO + * datasource query * * @author zhouhongfa@gz-yibo.com * @ClassName JdbcDatasourceQueryServiceImpl @@ -29,22 +31,42 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService { @Autowired private JobDatasourceService jobDatasourceService; + @Override + public List getDBs(Long id) throws UnknownHostException { + //获取数据源对象 + JobDatasource datasource = jobDatasourceService.getById(id); + return MongoDBQueryTool.getInstance(datasource).getDBNames(); + } + + @Override public List getTables(Long id) throws IOException { //获取数据源对象 - JobDatasource jobDatasource = jobDatasourceService.getById(id); + JobDatasource datasource = jobDatasourceService.getById(id); //queryTool组装 - if (ObjectUtil.isNull(jobDatasource)) { + if (ObjectUtil.isNull(datasource)) { return Lists.newArrayList(); } - if (DataSourceConstants.HBASE.equals(jobDatasource.getDatasource())) { - return HBaseQueryTool.getInstance(jobDatasource).getTableNames(); + if (JdbcConstants.HBASE.equals(datasource.getDatasource())) { + return HBaseQueryTool.getInstance(datasource).getTableNames(); } else { - BaseQueryTool qTool = QueryToolFactory.getByDbType(jobDatasource); + BaseQueryTool qTool = QueryToolFactory.getByDbType(datasource); return qTool.getTableNames(); } } + @Override + public List getCollectionNames(long id,String dbName) throws UnknownHostException { + //获取数据源对象 + JobDatasource datasource = jobDatasourceService.getById(id); + //queryTool组装 + if (ObjectUtil.isNull(datasource)) { + return Lists.newArrayList(); + } + return MongoDBQueryTool.getInstance(datasource).getCollectionNames(dbName); + } + + @Override public List getColumns(Long id, String tableName) throws IOException { //获取数据源对象 @@ -53,8 +75,10 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService { if (ObjectUtil.isNull(datasource)) { return Lists.newArrayList(); } - if (DataSourceConstants.HBASE.equals(datasource.getDatasource())) { + if (JdbcConstants.HBASE.equals(datasource.getDatasource())) { return HBaseQueryTool.getInstance(datasource).getColumns(tableName); + } else if (JdbcConstants.MONGODB.equals(datasource.getDatasource())) { + return MongoDBQueryTool.getInstance(datasource).getColumns(tableName); } else { BaseQueryTool queryTool = QueryToolFactory.getByDbType(datasource); return queryTool.getColumnNames(tableName, datasource.getDatasource()); diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java b/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java index 6d0cc89b..89bae99e 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java @@ -7,7 +7,7 @@ import com.wugui.datax.admin.service.JobDatasourceService; import com.wugui.datax.admin.tool.query.BaseQueryTool; import com.wugui.datax.admin.tool.query.HBaseQueryTool; import com.wugui.datax.admin.tool.query.QueryToolFactory; -import com.wugui.datax.admin.util.DataSourceConstants; +import com.wugui.datax.admin.util.JdbcConstants; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -26,7 +26,7 @@ public class JobDatasourceServiceImpl extends ServiceImpl extraParams = Maps.newHashMap(); @@ -91,30 +96,32 @@ public class DataxJsonHelper implements DataxJsonInterface { this.rdbmsReaderDto = dataxJsonDto.getRdbmsReader(); this.hbaseReaderDto = dataxJsonDto.getHbaseReader(); // reader 插件 - String readerDbType = JdbcUtils.getDbType(readerDatasource.getJdbcUrl(), readerDatasource.getJdbcDriverClass()); - if (StringUtils.isEmpty(readerDatasource.getZkAdress())) { - if (JdbcConstants.MYSQL.equals(readerDbType)) { - readerPlugin = new MysqlReader(); - } else if (JdbcConstants.ORACLE.equals(readerDbType)) { - readerPlugin = new OracleReader(); - } else if (JdbcConstants.SQL_SERVER.equals(readerDbType)) { - readerPlugin = new SqlServerReader(); - } else if (JdbcConstants.POSTGRESQL.equals(readerDbType)) { - readerPlugin = new PostgresqlReader(); - } else if (JdbcConstants.HIVE.equals(readerDbType)) { - readerPlugin = new HiveReader(); - buildReader = buildHiveReader(); - } - if (!JdbcConstants.HIVE.equals(readerDbType)) { - buildReader = this.buildReader(); - } - } else { + String datasource = readerDatasource.getDatasource(); + if (JdbcConstants.MYSQL.equals(datasource)) { + readerPlugin = new MysqlReader(); + buildReader = buildReader(); + } else if (JdbcConstants.ORACLE.equals(datasource)) { + readerPlugin = new OracleReader(); + buildReader = buildReader(); + } else if (JdbcConstants.SQL_SERVER.equals(datasource)) { + readerPlugin = new SqlServerReader(); + buildReader = buildReader(); + } else if (JdbcConstants.POSTGRESQL.equals(datasource)) { + readerPlugin = new PostgresqlReader(); + buildReader = buildReader(); + } else if (JdbcConstants.HIVE.equals(datasource)) { + readerPlugin = new HiveReader(); + buildReader = buildHiveReader(); + } else if (JdbcConstants.HBASE.equals(datasource)) { readerPlugin = new HBaseReader(); buildReader = buildHBaseReader(); + } else if (JdbcConstants.MONGODB.equals(datasource)) { + readerPlugin = new MongoDBReader(); + buildReader = buildMongoDBReader(); } - } + public void initWriter(DataxJsonDto dataxJsonDto, JobDatasource readerDatasource) { this.writerDatasource = readerDatasource; this.writerTables = dataxJsonDto.getWriterTables(); @@ -122,27 +129,30 @@ public class DataxJsonHelper implements DataxJsonInterface { this.hiveWriterDto = dataxJsonDto.getHiveWriter(); this.rdbmsWriterDto = dataxJsonDto.getRdbmsWriter(); this.hbaseWriterDto = dataxJsonDto.getHbaseWriter(); + this.mongoDBWriterDto=dataxJsonDto.getMongoDBWriterDto(); // writer - String writerDbType = JdbcUtils.getDbType(writerDatasource.getJdbcUrl(), writerDatasource.getJdbcDriverClass()); - if (StringUtils.isEmpty(readerDatasource.getZkAdress())) { - if (JdbcConstants.MYSQL.equals(writerDbType)) { - writerPlugin = new MysqlWriter(); - } else if (JdbcConstants.ORACLE.equals(writerDbType)) { - writerPlugin = new OraclelWriter(); - } else if (JdbcConstants.SQL_SERVER.equals(writerDbType)) { - writerPlugin = new SqlServerlWriter(); - } else if (JdbcConstants.POSTGRESQL.equals(writerDbType)) { - writerPlugin = new PostgresqllWriter(); - } else if (JdbcConstants.HIVE.equals(writerDbType)) { - writerPlugin = new HiveWriter(); - buildWriter = this.buildHiveWriter(); - } - if (!JdbcConstants.HIVE.equals(writerDbType)) { - buildWriter = this.buildWriter(); - } - } else { + String datasource = readerDatasource.getDatasource(); + if (JdbcConstants.MYSQL.equals(datasource)) { + writerPlugin = new MysqlWriter(); + buildWriter = this.buildWriter(); + } else if (JdbcConstants.ORACLE.equals(datasource)) { + writerPlugin = new OraclelWriter(); + buildWriter = this.buildWriter(); + } else if (JdbcConstants.SQL_SERVER.equals(datasource)) { + writerPlugin = new SqlServerlWriter(); + buildWriter = this.buildWriter(); + } else if (JdbcConstants.POSTGRESQL.equals(datasource)) { + writerPlugin = new PostgresqllWriter(); + buildWriter = this.buildWriter(); + } else if (JdbcConstants.HIVE.equals(datasource)) { + writerPlugin = new HiveWriter(); + buildWriter = this.buildHiveWriter(); + } else if (JdbcConstants.HBASE.equals(datasource)) { writerPlugin = new HBaseWriter(); - buildWriter = buildHBaseWriter(); + buildWriter = this.buildHBaseWriter(); + }else if (JdbcConstants.MONGODB.equals(datasource)) { + writerPlugin = new MongoDBWriter(); + buildWriter = this.buildMongoDBWriter(); } } @@ -230,6 +240,22 @@ public class DataxJsonHelper implements DataxJsonInterface { dataxHbasePojo.setReaderRange(hbaseReaderDto.getReaderRange()); return readerPlugin.buildHbase(dataxHbasePojo); } + + + @Override + public Map buildMongoDBReader() { + DataxMongoDBPojo dataxMongoDBPojo = new DataxMongoDBPojo(); + dataxMongoDBPojo.setJdbcDatasource(readerDatasource); + List> columns = Lists.newArrayList(); + buildColumns(readerColumns, columns); + dataxMongoDBPojo.setColumns(columns); + dataxMongoDBPojo.setAddress(readerDatasource.getJdbcUrl()); + dataxMongoDBPojo.setDbName(mongoDBReaderDto.getDbName()); + dataxMongoDBPojo.setCollectionName(mongoDBReaderDto.getCollectionName()); + return readerPlugin.buildMongoDB(dataxMongoDBPojo); + } + + @Override public Map buildWriter() { DataxRdbmsPojo dataxPluginPojo = new DataxRdbmsPojo(); @@ -245,12 +271,7 @@ public class DataxJsonHelper implements DataxJsonInterface { DataxHivePojo dataxHivePojo = new DataxHivePojo(); dataxHivePojo.setJdbcDatasource(writerDatasource); List> columns = Lists.newArrayList(); - writerColumns.forEach(c -> { - Map column = Maps.newLinkedHashMap(); - column.put("name", c.split(Constant.SPLIT_SCOLON)[1]); - column.put("type", c.split(Constant.SPLIT_SCOLON)[2]); - columns.add(column); - }); + buildColumns(writerColumns, columns); dataxHivePojo.setColumns(columns); dataxHivePojo.setWriterDefaultFS(hiveWriterDto.getWriterDefaultFS()); dataxHivePojo.setWriteFieldDelimiter(hiveWriterDto.getWriteFieldDelimiter()); @@ -261,14 +282,23 @@ public class DataxJsonHelper implements DataxJsonInterface { return writerPlugin.buildHive(dataxHivePojo); } + private void buildColumns(List columns, List> returnColumns) { + columns.forEach(c -> { + Map column = Maps.newLinkedHashMap(); + column.put("name", c.split(Constant.SPLIT_SCOLON)[1]); + column.put("type", c.split(Constant.SPLIT_SCOLON)[2]); + returnColumns.add(column); + }); + } + @Override public Map buildHBaseWriter() { DataxHbasePojo dataxHbasePojo = new DataxHbasePojo(); dataxHbasePojo.setJdbcDatasource(writerDatasource); List> columns = Lists.newArrayList(); - for(int i =0; i column = Maps.newLinkedHashMap(); - column.put("index",i); + column.put("index", i); column.put("name", writerColumns.get(i)); column.put("type", "string"); columns.add(column); @@ -281,4 +311,20 @@ public class DataxJsonHelper implements DataxJsonInterface { dataxHbasePojo.setWriterMode(hbaseWriterDto.getWriterMode()); return writerPlugin.buildHbase(dataxHbasePojo); } + + + @Override + public Map buildMongoDBWriter() { + DataxMongoDBPojo dataxMongoDBPojo = new DataxMongoDBPojo(); + dataxMongoDBPojo.setJdbcDatasource(writerDatasource); + List> columns = Lists.newArrayList(); + buildColumns(writerColumns, columns); + dataxMongoDBPojo.setColumns(columns); + dataxMongoDBPojo.setAddress(writerDatasource.getJdbcUrl()); + dataxMongoDBPojo.setDbName(mongoDBWriterDto.getDbName()); + dataxMongoDBPojo.setCollectionName(mongoDBWriterDto.getCollectionName()); + dataxMongoDBPojo.setUpsert(mongoDBWriterDto.isUpsert()); + dataxMongoDBPojo.setUpsertKey(mongoDBWriterDto.getUpsertKey()); + return writerPlugin.buildMongoDB(dataxMongoDBPojo); + } } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonInterface.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonInterface.java index 5ddfe8b8..0e971478 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonInterface.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonInterface.java @@ -5,10 +5,10 @@ import java.util.Map; /** * 构建 com.wugui.datax json的基础接口 * - * @author zhouhongfa@gz-yibo.com + * @author jingwk * @ClassName DataxJsonHelper - * @Version 1.0 - * @since 2019/7/30 22:24 + * @Version 2.1.1 + * @since 2020/03/14 12:24 */ public interface DataxJsonInterface { @@ -28,5 +28,9 @@ public interface DataxJsonInterface { Map buildHBaseWriter(); + Map buildMongoDBReader(); + + Map buildMongoDBWriter(); + Map buildWriter(); } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxPluginInterface.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxPluginInterface.java index d4da7670..c6edbf44 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxPluginInterface.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxPluginInterface.java @@ -2,6 +2,7 @@ package com.wugui.datax.admin.tool.datax; import com.wugui.datax.admin.tool.pojo.DataxHbasePojo; import com.wugui.datax.admin.tool.pojo.DataxHivePojo; +import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo; import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo; import java.util.Map; @@ -43,6 +44,14 @@ public interface DataxPluginInterface { * @return */ Map buildHbase(DataxHbasePojo dataxHbasePojo); + + /** + * mongodb json构建 + * @param dataxMongoDBPojo + * @return + */ + Map buildMongoDB(DataxMongoDBPojo dataxMongoDBPojo); + /** * 获取示例 * diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java index 93ee1a4c..5c8ea374 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java @@ -7,6 +7,7 @@ import com.wugui.datax.admin.entity.JobDatasource; import com.wugui.datax.admin.tool.datax.BaseDataxPlugin; import com.wugui.datax.admin.tool.pojo.DataxHbasePojo; import com.wugui.datax.admin.tool.pojo.DataxHivePojo; +import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo; import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo; import org.apache.commons.lang3.StringUtils; @@ -65,4 +66,9 @@ public abstract class BaseReaderPlugin extends BaseDataxPlugin { @Override public Map buildHbase(DataxHbasePojo dataxHbasePojo) { return null; } + + @Override + public Map buildMongoDB(DataxMongoDBPojo dataxMongoDBPojo) { + return null; + } } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/MongoDBReader.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/MongoDBReader.java new file mode 100644 index 00000000..c142dd83 --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/MongoDBReader.java @@ -0,0 +1,32 @@ +package com.wugui.datax.admin.tool.datax.reader; + +import com.google.common.collect.Maps; +import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo; + +import java.util.Map; + +public class MongoDBReader extends BaseReaderPlugin implements DataxReaderInterface { + @Override + public String getName() { + return "mongodbreader"; + } + + @Override + public Map sample() { + return null; + } + + public Map buildMongoDB(DataxMongoDBPojo plugin) { + //构建 + Map readerObj = Maps.newLinkedHashMap(); + readerObj.put("name", getName()); + Map parameterObj = Maps.newLinkedHashMap(); + parameterObj.put("address", plugin.getAddress()); + parameterObj.put("userName", plugin.getJdbcDatasource().getJdbcUsername()); + parameterObj.put("userPassword", plugin.getJdbcDatasource().getJdbcPassword()); + parameterObj.put("dbName", plugin.getDbName()); + parameterObj.put("collectionName", plugin.getCollectionName()); + readerObj.put("parameter", parameterObj); + return readerObj; + } +} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/OracleReader.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/OracleReader.java index 926536fb..1fe7332e 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/OracleReader.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/OracleReader.java @@ -1,5 +1,7 @@ package com.wugui.datax.admin.tool.datax.reader; +import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo; + import java.util.Map; /** diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java index 7a0f352d..be6d853d 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java @@ -6,6 +6,7 @@ import com.wugui.datax.admin.entity.JobDatasource; import com.wugui.datax.admin.tool.datax.BaseDataxPlugin; import com.wugui.datax.admin.tool.pojo.DataxHbasePojo; import com.wugui.datax.admin.tool.pojo.DataxHivePojo; +import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo; import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo; import java.util.Map; @@ -50,5 +51,12 @@ public abstract class BaseWriterPlugin extends BaseDataxPlugin { @Override - public Map buildHbase(DataxHbasePojo dataxHbasePojo) { return null; } + public Map buildHbase(DataxHbasePojo dataxHbasePojo) { + return null; + } + + @Override + public Map buildMongoDB(DataxMongoDBPojo plugin) { + return null; + } } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java index 0ab28a30..819b3d7a 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java @@ -19,8 +19,8 @@ public class HBaseWriter extends BaseWriterPlugin implements DataxWriterInterfac public Map buildHbase(DataxHbasePojo plugin) { //构建 - Map readerObj = Maps.newLinkedHashMap(); - readerObj.put("name", getName()); + Map writerObj = Maps.newLinkedHashMap(); + writerObj.put("name", getName()); Map parameterObj = Maps.newLinkedHashMap(); Map confige = Maps.newLinkedHashMap(); confige.put("hbase.zookeeper.quorum", plugin.getWriterHbaseConfig()); @@ -34,7 +34,7 @@ public class HBaseWriter extends BaseWriterPlugin implements DataxWriterInterfac if (StringUtils.isNotBlank(plugin.getWriterVersionColumn().getValue())) { parameterObj.put("versionColumn", plugin.getWriterVersionColumn()); } - readerObj.put("parameter", parameterObj); - return readerObj; + writerObj.put("parameter", parameterObj); + return writerObj; } } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/MongoDBWriter.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/MongoDBWriter.java new file mode 100644 index 00000000..f30127f6 --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/MongoDBWriter.java @@ -0,0 +1,40 @@ +package com.wugui.datax.admin.tool.datax.writer; + +import com.google.common.collect.Maps; +import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo; + +import java.util.Map; + +public class MongoDBWriter extends BaseWriterPlugin implements DataxWriterInterface { + @Override + public String getName() { + return "mongodbwriter"; + } + + @Override + public Map sample() { + return null; + } + + + + @Override + public Map buildMongoDB(DataxMongoDBPojo plugin) { + //构建 + Map writerObj = Maps.newLinkedHashMap(); + writerObj.put("name", getName()); + Map parameterObj = Maps.newLinkedHashMap(); + parameterObj.put("address", plugin.getAddress()); + parameterObj.put("userName", plugin.getJdbcDatasource().getJdbcUsername()); + parameterObj.put("userPassword", plugin.getJdbcDatasource().getJdbcPassword()); + parameterObj.put("dbName", plugin.getDbName()); + parameterObj.put("collectionName", plugin.getCollectionName()); + writerObj.put("parameter", parameterObj); + + Map upsertInfo = Maps.newLinkedHashMap(); + upsertInfo.put("isUpsert", plugin.isUpsert()); + parameterObj.put("upsertKey", plugin.getUpsertKey()); + writerObj.put("upsertInfo", upsertInfo); + return writerObj; + } +} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/SqlServerlWriter.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/SqlServerlWriter.java index 5a8775da..a928fd49 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/SqlServerlWriter.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/SqlServerlWriter.java @@ -1,5 +1,6 @@ package com.wugui.datax.admin.tool.datax.writer; + import java.util.Map; /** @@ -15,7 +16,6 @@ public class SqlServerlWriter extends BaseWriterPlugin implements DataxWriterInt return "sqlserverwriter"; } - @Override public Map sample() { return null; diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/meta/DatabaseMetaFactory.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/meta/DatabaseMetaFactory.java index f81650db..082d80e1 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/meta/DatabaseMetaFactory.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/meta/DatabaseMetaFactory.java @@ -1,5 +1,6 @@ package com.wugui.datax.admin.tool.meta; -import com.alibaba.druid.util.JdbcConstants; + +import com.wugui.datax.admin.util.JdbcConstants; /** * meta信息工厂 diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxMongoDBPojo.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxMongoDBPojo.java new file mode 100644 index 00000000..85397367 --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxMongoDBPojo.java @@ -0,0 +1,45 @@ +package com.wugui.datax.admin.tool.pojo; + +import com.wugui.datax.admin.entity.JobDatasource; +import lombok.Data; + +import java.util.List; +import java.util.Map; + +/** + * 用于传参,构建json + * + * @author jingwk + * @ClassName DataxMongoDBPojo + * @Version 2.0 + * @since 2020/03/14 11:15 + */ +@Data +public class DataxMongoDBPojo { + + /** + * hive列名 + */ + private List> columns; + + /** + * 数据源信息 + */ + private JobDatasource jdbcDatasource; + + private String address; + + private String dbName; + + private String collectionName; + + /** + * 当设置为true时,表示针对相同的upsertKey做更新操作 + */ + private boolean isUpsert; + /** + * upsertKey指定了没行记录的业务主键。用来做更新时使用。 + */ + private String upsertKey; +} + diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java index b7415806..d3f5e9d8 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java @@ -1,8 +1,7 @@ package com.wugui.datax.admin.tool.query; import cn.hutool.core.util.StrUtil; -import com.alibaba.druid.util.JdbcConstants; -import com.alibaba.druid.util.JdbcUtils; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.wugui.datatx.core.util.Constant; @@ -13,6 +12,8 @@ import com.wugui.datax.admin.tool.database.DasColumn; import com.wugui.datax.admin.tool.database.TableInfo; import com.wugui.datax.admin.tool.meta.DatabaseInterface; import com.wugui.datax.admin.tool.meta.DatabaseMetaFactory; +import com.wugui.datax.admin.util.JdbcConstants; +import com.wugui.datax.admin.util.JdbcUtils; import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,6 @@ public abstract class BaseQueryTool implements QueryToolInterface { * @param jobDatasource */ BaseQueryTool(JobDatasource jobDatasource) throws SQLException { - String currentDbType = JdbcUtils.getDbType(jobDatasource.getJdbcUrl(), jobDatasource.getJdbcDriverClass()); if (LocalCacheUtil.get(jobDatasource.getDatasourceName()) == null) { getDataSource(jobDatasource); } else { @@ -63,7 +63,7 @@ public abstract class BaseQueryTool implements QueryToolInterface { getDataSource(jobDatasource); } } - sqlBuilder = DatabaseMetaFactory.getByDbType(currentDbType); + sqlBuilder = DatabaseMetaFactory.getByDbType(jobDatasource.getDatasource()); currentSchema = getSchema(jobDatasource.getJdbcUsername()); LocalCacheUtil.set(jobDatasource.getDatasourceName(), this.connection, 4 * 60 * 60 * 1000); } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/MongoDBQueryTool.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/MongoDBQueryTool.java index 0b786b24..2581ccbf 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/MongoDBQueryTool.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/MongoDBQueryTool.java @@ -1,18 +1,18 @@ package com.wugui.datax.admin.tool.query; -import com.mongodb.BasicDBObject; -import com.mongodb.Block; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; +import com.mongodb.*; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoIterable; import com.wugui.datax.admin.entity.JobDatasource; +import org.apache.commons.lang3.StringUtils; import org.bson.Document; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; @@ -25,9 +25,15 @@ public class MongoDBQueryTool { - MongoDBQueryTool(JobDatasource jobDatasource) { + MongoDBQueryTool(JobDatasource jobDatasource) throws UnknownHostException { if (mongoClient == null){ - mongoClient = new MongoClient(new MongoClientURI(jobDatasource.getJdbcUrl())); + if(StringUtils.isBlank(jobDatasource.getJdbcUsername()) && StringUtils.isBlank(jobDatasource.getJdbcPassword())){ + mongoClient = new MongoClient(jobDatasource.getJdbcUrl()); + }else{ + MongoCredential credential = MongoCredential.createCredential(jobDatasource.getJdbcUsername(), null, jobDatasource.getJdbcPassword().toCharArray()); + mongoClient = new MongoClient(parseServerAddress(jobDatasource.getJdbcUrl()), Arrays.asList(credential)); + } + } } @@ -35,7 +41,7 @@ public class MongoDBQueryTool { * 获得该类的实例,单例模式 * @return */ - public static MongoDBQueryTool getInstance(JobDatasource jobDatasource) throws IOException { + public static MongoDBQueryTool getInstance(JobDatasource jobDatasource) throws UnknownHostException { if (instance == null) { synchronized(MongoDBQueryTool.class){ if(instance == null){ @@ -98,4 +104,39 @@ public class MongoDBQueryTool { }); return list; } + + /** + * 判断地址类型是否符合要求 + * @param addressList + * @return + */ + private static boolean isHostPortPattern(List addressList) { + for(Object address : addressList) { + String regex = "(\\S+):([0-9]+)"; + if(!((String)address).matches(regex)) { + return false; + } + } + return true; + } + + /** + * 转换为mongo地址协议 + * @param rawAddress + * @return + */ + private static List parseServerAddress(String rawAddress) throws UnknownHostException { + List addressList = new ArrayList<>(); + for(String address : Arrays.asList(rawAddress.split(","))) { + String[] tempAddress = address.split(":"); + try { + ServerAddress sa = new ServerAddress(tempAddress[0],Integer.valueOf(tempAddress[1])); + addressList.add(sa); + } catch (Exception e) { + throw new UnknownHostException(); + } + } + return addressList; + } + } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/QueryToolFactory.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/QueryToolFactory.java index 76157113..2ea341da 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/QueryToolFactory.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/QueryToolFactory.java @@ -1,8 +1,7 @@ package com.wugui.datax.admin.tool.query; -import com.alibaba.druid.util.JdbcConstants; -import com.alibaba.druid.util.JdbcUtils; import com.wugui.datax.admin.entity.JobDatasource; +import com.wugui.datax.admin.util.JdbcConstants; import com.wugui.datax.admin.util.RdbmsException; import java.sql.SQLException; @@ -17,21 +16,21 @@ import java.sql.SQLException; */ public class QueryToolFactory { - public static final BaseQueryTool getByDbType(JobDatasource jobJdbcDatasource) { + public static final BaseQueryTool getByDbType(JobDatasource jobDatasource) { //获取dbType - String dbType = JdbcUtils.getDbType(jobJdbcDatasource.getJdbcUrl(), jobJdbcDatasource.getJdbcDriverClass()); - if (JdbcConstants.MYSQL.equals(dbType)) { - return getMySQLQueryToolInstance(jobJdbcDatasource); - } else if (JdbcConstants.ORACLE.equals(dbType)) { - return getOracleQueryToolInstance(jobJdbcDatasource); - } else if (JdbcConstants.POSTGRESQL.equals(dbType)) { - return getPostgresqlQueryToolInstance(jobJdbcDatasource); - } else if (JdbcConstants.SQL_SERVER.equals(dbType)) { - return getSqlserverQueryToolInstance(jobJdbcDatasource); - }else if (JdbcConstants.HIVE.equals(dbType)) { - return getHiveQueryToolInstance(jobJdbcDatasource); + String datasource = jobDatasource.getDatasource(); + if (JdbcConstants.MYSQL.equals(datasource)) { + return getMySQLQueryToolInstance(jobDatasource); + } else if (JdbcConstants.ORACLE.equals(datasource)) { + return getOracleQueryToolInstance(jobDatasource); + } else if (JdbcConstants.POSTGRESQL.equals(datasource)) { + return getPostgresqlQueryToolInstance(jobDatasource); + } else if (JdbcConstants.SQL_SERVER.equals(datasource)) { + return getSqlserverQueryToolInstance(jobDatasource); + }else if (JdbcConstants.HIVE.equals(datasource)) { + return getHiveQueryToolInstance(jobDatasource); } - throw new UnsupportedOperationException("找不到该类型: ".concat(dbType)); + throw new UnsupportedOperationException("找不到该类型: ".concat(datasource)); } private static BaseQueryTool getMySQLQueryToolInstance(JobDatasource jdbcDatasource) { diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/util/DataSourceConstants.java b/datax-admin/src/main/java/com/wugui/datax/admin/util/DataSourceConstants.java deleted file mode 100644 index d4fc8040..00000000 --- a/datax-admin/src/main/java/com/wugui/datax/admin/util/DataSourceConstants.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.wugui.datax.admin.util; - -import com.alibaba.druid.util.JdbcConstants; - -public interface DataSourceConstants extends JdbcConstants { - - String HBASE = "hbase"; - - String HBASE_ZK_QUORUM = "hbase.zookeeper.quorum"; -} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcConstants.java b/datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcConstants.java new file mode 100644 index 00000000..a3d8340c --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcConstants.java @@ -0,0 +1,119 @@ +package com.wugui.datax.admin.util; + + +/** + * JdbcConstants + * + * @author jingwk + * @ClassName JdbcConstants + * @Version 2.1.1 + * @since 2020/03/14 07:15 + */ +public interface JdbcConstants { + + + String HBASE_ZK_QUORUM = "hbase.zookeeper.quorum"; + + String MONGODB ="mongodb"; + + String JTDS = "jtds"; + + String MOCK = "mock"; + + String HSQL = "hsql"; + + String DB2 = "db2"; + + String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver"; + + String POSTGRESQL = "postgresql"; + String POSTGRESQL_DRIVER = "org.postgresql.Driver"; + + String SYBASE = "sybase"; + + String SQL_SERVER = "sqlserver"; + String SQL_SERVER_DRIVER = "com.microsoft.jdbc.sqlserver.SQLServerDriver"; + String SQL_SERVER_DRIVER_SQLJDBC4 = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; + String SQL_SERVER_DRIVER_JTDS = "net.sourceforge.jtds.jdbc.Driver"; + + String ORACLE = "oracle"; + String ORACLE_DRIVER = "oracle.jdbc.OracleDriver"; + String ORACLE_DRIVER2 = "oracle.jdbc.driver.OracleDriver"; + + String ALI_ORACLE = "AliOracle"; + String ALI_ORACLE_DRIVER = "com.alibaba.jdbc.AlibabaDriver"; + + String MYSQL = "mysql"; + String MYSQL_DRIVER = "com.mysql.jdbc.Driver"; + String MYSQL_DRIVER_6 = "com.mysql.cj.jdbc.Driver"; + String MYSQL_DRIVER_REPLICATE = "com.mysql.jdbc."; + + String MARIADB = "mariadb"; + String MARIADB_DRIVER = "org.mariadb.jdbc.Driver"; + + String DERBY = "derby"; + + String HBASE = "hbase"; + + String HIVE = "hive"; + String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; + + String H2 = "h2"; + String H2_DRIVER = "org.h2.Driver"; + + String DM = "dm"; + String DM_DRIVER = "dm.jdbc.driver.DmDriver"; + + String KINGBASE = "kingbase"; + String KINGBASE_DRIVER = "com.kingbase.Driver"; + + String GBASE = "gbase"; + String GBASE_DRIVER = "com.gbase.jdbc.Driver"; + + String XUGU = "xugu"; + String XUGU_DRIVER = "com.xugu.cloudjdbc.Driver"; + + String OCEANBASE = "oceanbase"; + String OCEANBASE_DRIVER = "com.mysql.jdbc.Driver"; + String INFORMIX = "informix"; + + /** + * 阿里云odps + */ + String ODPS = "odps"; + String ODPS_DRIVER = "com.aliyun.odps.jdbc.OdpsDriver"; + + String TERADATA = "teradata"; + String TERADATA_DRIVER = "com.teradata.jdbc.TeraDriver"; + + /** + * Log4JDBC + */ + String LOG4JDBC = "log4jdbc"; + String LOG4JDBC_DRIVER = "net.sf.log4jdbc.DriverSpy"; + + String PHOENIX = "phoenix"; + String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; + String ENTERPRISEDB = "edb"; + String ENTERPRISEDB_DRIVER = "com.edb.Driver"; + + String KYLIN = "kylin"; + String KYLIN_DRIVER = "org.apache.kylin.jdbc.Driver"; + + + String SQLITE = "sqlite"; + String SQLITE_DRIVER = "org.sqlite.JDBC"; + + String ALIYUN_ADS = "aliyun_ads"; + String ALIYUN_DRDS = "aliyun_drds"; + + String PRESTO = "presto"; + String PRESTO_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"; + + String ELASTIC_SEARCH = "elastic_search"; + + String ELASTIC_SEARCH_DRIVER = "com.alibaba.xdriver.elastic.jdbc.ElasticDriver"; + + String CLICKHOUSE = "clickhouse"; + String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver"; +} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcUtils.java b/datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcUtils.java new file mode 100644 index 00000000..e27ac84d --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/util/JdbcUtils.java @@ -0,0 +1,683 @@ +package com.wugui.datax.admin.util; + + +import org.apache.xerces.impl.dv.util.HexBin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.io.Closeable; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.URL; +import java.sql.Date; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +/** + * jdbc utils + * + * @author jingwk + * @ClassName JdbcUtils + * @Version 2.1.1 + * @since 2020/03/14 07:15 + */ +public final class JdbcUtils implements JdbcConstants { + + private static Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + + + private static final Properties DRIVER_URL_MAPPING = new Properties(); + + private static Boolean mysql_driver_version_6 = null; + + static { + try { + ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + if (ctxClassLoader != null) { + for (Enumeration e = ctxClassLoader.getResources("META-INF/druid-driver.properties"); e.hasMoreElements();) { + URL url = e.nextElement(); + + Properties property = new Properties(); + + InputStream is = null; + try { + is = url.openStream(); + property.load(is); + } finally { + JdbcUtils.close(is); + } + + DRIVER_URL_MAPPING.putAll(property); + } + } + } catch (Exception e) { + LOG.error("load druid-driver.properties error", e); + } + } + + public static void close(Connection x) { + if (x == null) { + return; + } + try { + x.close(); + } catch (Exception e) { + LOG.debug("close connection error", e); + } + } + + public static void close(Statement x) { + if (x == null) { + return; + } + try { + x.close(); + } catch (Exception e) { + LOG.debug("close statement error", e); + } + } + + public static void close(ResultSet x) { + if (x == null) { + return; + } + try { + x.close(); + } catch (Exception e) { + LOG.debug("close result set error", e); + } + } + + public static void close(Closeable x) { + if (x == null) { + return; + } + + try { + x.close(); + } catch (Exception e) { + LOG.debug("close error", e); + } + } + + public static void close(Blob x) { + if (x == null) { + return; + } + + try { + x.free(); + } catch (Exception e) { + LOG.debug("close error", e); + } + } + + public static void close(Clob x) { + if (x == null) { + return; + } + + try { + x.free(); + } catch (Exception e) { + LOG.debug("close error", e); + } + } + + public static void printResultSet(ResultSet rs) throws SQLException { + printResultSet(rs, System.out); + } + + public static void printResultSet(ResultSet rs, PrintStream out) throws SQLException { + printResultSet(rs, out, true, "\t"); + } + + public static void printResultSet(ResultSet rs, PrintStream out, boolean printHeader, String seperator) throws SQLException { + ResultSetMetaData metadata = rs.getMetaData(); + int columnCount = metadata.getColumnCount(); + if (printHeader) { + for (int columnIndex = 1; columnIndex <= columnCount; ++columnIndex) { + if (columnIndex != 1) { + out.print(seperator); + } + out.print(metadata.getColumnName(columnIndex)); + } + } + + out.println(); + + while (rs.next()) { + + for (int columnIndex = 1; columnIndex <= columnCount; ++columnIndex) { + if (columnIndex != 1) { + out.print(seperator); + } + + int type = metadata.getColumnType(columnIndex); + + if (type == Types.VARCHAR || type == Types.CHAR || type == Types.NVARCHAR || type == Types.NCHAR) { + out.print(rs.getString(columnIndex)); + } else if (type == Types.DATE) { + Date date = rs.getDate(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(date.toString()); + } + } else if (type == Types.BIT) { + boolean value = rs.getBoolean(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(Boolean.toString(value)); + } + } else if (type == Types.BOOLEAN) { + boolean value = rs.getBoolean(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(Boolean.toString(value)); + } + } else if (type == Types.TINYINT) { + byte value = rs.getByte(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(Byte.toString(value)); + } + } else if (type == Types.SMALLINT) { + short value = rs.getShort(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(Short.toString(value)); + } + } else if (type == Types.INTEGER) { + int value = rs.getInt(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(Integer.toString(value)); + } + } else if (type == Types.BIGINT) { + long value = rs.getLong(columnIndex); + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(Long.toString(value)); + } + } else if (type == Types.TIMESTAMP) { + out.print(String.valueOf(rs.getTimestamp(columnIndex))); + } else if (type == Types.DECIMAL) { + out.print(String.valueOf(rs.getBigDecimal(columnIndex))); + } else if (type == Types.CLOB) { + out.print(String.valueOf(rs.getString(columnIndex))); + } else if (type == Types.JAVA_OBJECT) { + Object object = rs.getObject(columnIndex); + + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(String.valueOf(object)); + } + } else if (type == Types.LONGVARCHAR) { + Object object = rs.getString(columnIndex); + + if (rs.wasNull()) { + out.print("null"); + } else { + out.print(String.valueOf(object)); + } + } else if (type == Types.NULL) { + out.print("null"); + } else { + Object object = rs.getObject(columnIndex); + + if (rs.wasNull()) { + out.print("null"); + } else { + if (object instanceof byte[]) { + byte[] bytes = (byte[]) object; + String text = HexBin.encode(bytes); + out.print(text); + } else { + out.print(String.valueOf(object)); + } + } + } + } + out.println(); + } + } + + public static String getTypeName(int sqlType) { + switch (sqlType) { + case Types.ARRAY: + return "ARRAY"; + + case Types.BIGINT: + return "BIGINT"; + + case Types.BINARY: + return "BINARY"; + + case Types.BIT: + return "BIT"; + + case Types.BLOB: + return "BLOB"; + + case Types.BOOLEAN: + return "BOOLEAN"; + + case Types.CHAR: + return "CHAR"; + + case Types.CLOB: + return "CLOB"; + + case Types.DATALINK: + return "DATALINK"; + + case Types.DATE: + return "DATE"; + + case Types.DECIMAL: + return "DECIMAL"; + + case Types.DISTINCT: + return "DISTINCT"; + + case Types.DOUBLE: + return "DOUBLE"; + + case Types.FLOAT: + return "FLOAT"; + + case Types.INTEGER: + return "INTEGER"; + + case Types.JAVA_OBJECT: + return "JAVA_OBJECT"; + + case Types.LONGNVARCHAR: + return "LONGNVARCHAR"; + + case Types.LONGVARBINARY: + return "LONGVARBINARY"; + + case Types.NCHAR: + return "NCHAR"; + + case Types.NCLOB: + return "NCLOB"; + + case Types.NULL: + return "NULL"; + + case Types.NUMERIC: + return "NUMERIC"; + + case Types.NVARCHAR: + return "NVARCHAR"; + + case Types.REAL: + return "REAL"; + + case Types.REF: + return "REF"; + + case Types.ROWID: + return "ROWID"; + + case Types.SMALLINT: + return "SMALLINT"; + + case Types.SQLXML: + return "SQLXML"; + + case Types.STRUCT: + return "STRUCT"; + + case Types.TIME: + return "TIME"; + + case Types.TIMESTAMP: + return "TIMESTAMP"; + + case Types.TINYINT: + return "TINYINT"; + + case Types.VARBINARY: + return "VARBINARY"; + + case Types.VARCHAR: + return "VARCHAR"; + + default: + return "OTHER"; + + } + } + + public static String getDbType(String rawUrl, String driverClassName) { + if (rawUrl == null) { + return null; + } + + if (rawUrl.startsWith("jdbc:derby:") || rawUrl.startsWith("jdbc:log4jdbc:derby:")) { + return DERBY; + } else if (rawUrl.startsWith("jdbc:mysql:") || rawUrl.startsWith("jdbc:cobar:") + || rawUrl.startsWith("jdbc:log4jdbc:mysql:")) { + return MYSQL; + } else if (rawUrl.startsWith("jdbc:mariadb:")) { + return MARIADB; + } else if (rawUrl.startsWith("jdbc:oracle:") || rawUrl.startsWith("jdbc:log4jdbc:oracle:")) { + return ORACLE; + } else if (rawUrl.startsWith("jdbc:alibaba:oracle:")) { + return ALI_ORACLE; + } else if (rawUrl.startsWith("jdbc:microsoft:") || rawUrl.startsWith("jdbc:log4jdbc:microsoft:")) { + return SQL_SERVER; + } else if (rawUrl.startsWith("jdbc:sqlserver:") || rawUrl.startsWith("jdbc:log4jdbc:sqlserver:")) { + return SQL_SERVER; + } else if (rawUrl.startsWith("jdbc:sybase:Tds:") || rawUrl.startsWith("jdbc:log4jdbc:sybase:")) { + return SYBASE; + } else if (rawUrl.startsWith("jdbc:jtds:") || rawUrl.startsWith("jdbc:log4jdbc:jtds:")) { + return JTDS; + } else if (rawUrl.startsWith("jdbc:fake:") || rawUrl.startsWith("jdbc:mock:")) { + return MOCK; + } else if (rawUrl.startsWith("jdbc:postgresql:") || rawUrl.startsWith("jdbc:log4jdbc:postgresql:")) { + return POSTGRESQL; + } else if (rawUrl.startsWith("jdbc:edb:")) { + return ENTERPRISEDB; + } else if (rawUrl.startsWith("jdbc:hsqldb:") || rawUrl.startsWith("jdbc:log4jdbc:hsqldb:")) { + return HSQL; + } else if (rawUrl.startsWith("jdbc:odps:")) { + return ODPS; + } else if (rawUrl.startsWith("jdbc:db2:")) { + return DB2; + } else if (rawUrl.startsWith("jdbc:sqlite:")) { + return SQLITE; + } else if (rawUrl.startsWith("jdbc:ingres:")) { + return "ingres"; + } else if (rawUrl.startsWith("jdbc:h2:") || rawUrl.startsWith("jdbc:log4jdbc:h2:")) { + return H2; + } else if (rawUrl.startsWith("jdbc:mckoi:")) { + return "mckoi"; + } else if (rawUrl.startsWith("jdbc:cloudscape:")) { + return "cloudscape"; + } else if (rawUrl.startsWith("jdbc:informix-sqli:") || rawUrl.startsWith("jdbc:log4jdbc:informix-sqli:")) { + return "informix"; + } else if (rawUrl.startsWith("jdbc:timesten:")) { + return "timesten"; + } else if (rawUrl.startsWith("jdbc:as400:")) { + return "as400"; + } else if (rawUrl.startsWith("jdbc:sapdb:")) { + return "sapdb"; + } else if (rawUrl.startsWith("jdbc:JSQLConnect:")) { + return "JSQLConnect"; + } else if (rawUrl.startsWith("jdbc:JTurbo:")) { + return "JTurbo"; + } else if (rawUrl.startsWith("jdbc:firebirdsql:")) { + return "firebirdsql"; + } else if (rawUrl.startsWith("jdbc:interbase:")) { + return "interbase"; + } else if (rawUrl.startsWith("jdbc:pointbase:")) { + return "pointbase"; + } else if (rawUrl.startsWith("jdbc:edbc:")) { + return "edbc"; + } else if (rawUrl.startsWith("jdbc:mimer:multi1:")) { + return "mimer"; + } else if (rawUrl.startsWith("jdbc:dm:")) { + return JdbcConstants.DM; + } else if (rawUrl.startsWith("jdbc:kingbase:")) { + return JdbcConstants.KINGBASE; + } else if (rawUrl.startsWith("jdbc:gbase:")) { + return JdbcConstants.GBASE; + } else if (rawUrl.startsWith("jdbc:xugu:")) { + return JdbcConstants.XUGU; + } else if (rawUrl.startsWith("jdbc:log4jdbc:")) { + return LOG4JDBC; + } else if (rawUrl.startsWith("jdbc:hive:")) { + return HIVE; + } else if (rawUrl.startsWith("jdbc:hive2:")) { + return HIVE; + } else if (rawUrl.startsWith("jdbc:phoenix:")) { + return PHOENIX; + } else if (rawUrl.startsWith("jdbc:elastic:")) { + return ELASTIC_SEARCH; + } else if (rawUrl.startsWith("jdbc:clickhouse:")) { + return CLICKHOUSE; + }else if (rawUrl.startsWith("jdbc:presto:")) { + return PRESTO; + } else { + return null; + } + } + + public static Driver createDriver(String driverClassName) throws SQLException { + return createDriver(null, driverClassName); + } + + public static Driver createDriver(ClassLoader classLoader, String driverClassName) throws SQLException { + Class clazz = null; + if (classLoader != null) { + try { + clazz = classLoader.loadClass(driverClassName); + } catch (ClassNotFoundException e) { + // skip + } + } + + if (clazz == null) { + try { + ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); + if (contextLoader != null) { + clazz = contextLoader.loadClass(driverClassName); + } + } catch (ClassNotFoundException e) { + // skip + } + } + + if (clazz == null) { + try { + clazz = Class.forName(driverClassName); + } catch (ClassNotFoundException e) { + throw new SQLException(e.getMessage(), e); + } + } + + try { + return (Driver) clazz.newInstance(); + } catch (IllegalAccessException e) { + throw new SQLException(e.getMessage(), e); + } catch (InstantiationException e) { + throw new SQLException(e.getMessage(), e); + } + } + + public static int executeUpdate(DataSource dataSource, String sql, Object... parameters) throws SQLException { + return executeUpdate(dataSource, sql, Arrays.asList(parameters)); + } + + public static int executeUpdate(DataSource dataSource, String sql, List parameters) throws SQLException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + return executeUpdate(conn, sql, parameters); + } finally { + close(conn); + } + } + + public static int executeUpdate(Connection conn, String sql, List parameters) throws SQLException { + PreparedStatement stmt = null; + + int updateCount; + try { + stmt = conn.prepareStatement(sql); + + setParameters(stmt, parameters); + + updateCount = stmt.executeUpdate(); + } finally { + JdbcUtils.close(stmt); + } + + return updateCount; + } + + public static void execute(DataSource dataSource, String sql, Object... parameters) throws SQLException { + execute(dataSource, sql, Arrays.asList(parameters)); + } + + public static void execute(DataSource dataSource, String sql, List parameters) throws SQLException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + execute(conn, sql, parameters); + } finally { + close(conn); + } + } + + public static void execute(Connection conn, String sql) throws SQLException { + execute(conn, sql, Collections.emptyList()); + } + + public static void execute(Connection conn, String sql, List parameters) throws SQLException { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(sql); + + setParameters(stmt, parameters); + + stmt.executeUpdate(); + } finally { + JdbcUtils.close(stmt); + } + } + + public static List> executeQuery(DataSource dataSource, String sql, Object... parameters) + throws SQLException { + return executeQuery(dataSource, sql, Arrays.asList(parameters)); + } + + public static List> executeQuery(DataSource dataSource, String sql, List parameters) + throws SQLException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + return executeQuery(conn, sql, parameters); + } finally { + close(conn); + } + } + + public static List> executeQuery(Connection conn, String sql, List parameters) + throws SQLException { + List> rows = new ArrayList>(); + + PreparedStatement stmt = null; + ResultSet rs = null; + try { + stmt = conn.prepareStatement(sql); + + setParameters(stmt, parameters); + + rs = stmt.executeQuery(); + + ResultSetMetaData rsMeta = rs.getMetaData(); + + while (rs.next()) { + Map row = new LinkedHashMap(); + + for (int i = 0, size = rsMeta.getColumnCount(); i < size; ++i) { + String columName = rsMeta.getColumnLabel(i + 1); + Object value = rs.getObject(i + 1); + row.put(columName, value); + } + + rows.add(row); + } + } finally { + JdbcUtils.close(rs); + JdbcUtils.close(stmt); + } + + return rows; + } + + private static void setParameters(PreparedStatement stmt, List parameters) throws SQLException { + for (int i = 0, size = parameters.size(); i < size; ++i) { + Object param = parameters.get(i); + stmt.setObject(i + 1, param); + } + } + + public static void insertToTable(DataSource dataSource, String tableName, Map data) + throws SQLException { + Connection conn = null; + try { + conn = dataSource.getConnection(); + insertToTable(conn, tableName, data); + } finally { + close(conn); + } + } + + public static void insertToTable(Connection conn, String tableName, Map data) throws SQLException { + String sql = makeInsertToTableSql(tableName, data.keySet()); + List parameters = new ArrayList(data.values()); + execute(conn, sql, parameters); + } + + public static String makeInsertToTableSql(String tableName, Collection names) { + StringBuilder sql = new StringBuilder() // + .append("insert into ") // + .append(tableName) // + .append("("); // + + int nameCount = 0; + for (String name : names) { + if (nameCount > 0) { + sql.append(","); + } + sql.append(name); + nameCount++; + } + sql.append(") values ("); + for (int i = 0; i < nameCount; ++i) { + if (i != 0) { + sql.append(","); + } + sql.append("?"); + } + sql.append(")"); + + return sql.toString(); + } + + +} diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/util/RdbmsException.java b/datax-admin/src/main/java/com/wugui/datax/admin/util/RdbmsException.java index 1c34e2c6..73c9886b 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/util/RdbmsException.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/util/RdbmsException.java @@ -1,10 +1,14 @@ package com.wugui.datax.admin.util; -import com.alibaba.druid.util.JdbcConstants; import com.wugui.datatx.core.util.Constant; /** - * Created by judy.lt on 2015/6/5. + * RdbmsException + * + * @author jingwk + * @ClassName RdbmsException + * @Version 2.1.1 + * @since 2020/03/14 07:15 */ public class RdbmsException extends DataXException{ diff --git a/datax-admin/src/main/resources/logback.xml b/datax-admin/src/main/resources/logback.xml index 3f6528f8..131c4fa9 100644 --- a/datax-admin/src/main/resources/logback.xml +++ b/datax-admin/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - +