mirror of
https://github.com/WeiYe-Jing/datax-web.git
synced 2026-06-30 21:17:37 +08:00
json数据源构建支持mongodb
This commit is contained in:
parent
a90647ce60
commit
4d75eaabfa
@ -27,6 +27,31 @@ public class DatasourceQueryController extends ApiController {
|
||||
@Autowired
|
||||
private DatasourceQueryService datasourceQueryService;
|
||||
|
||||
/**
|
||||
* 根据数据源id获取mongo库名
|
||||
*
|
||||
* @param datasourceId
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/getDBs")
|
||||
@ApiOperation("根据数据源id获取mongo库名")
|
||||
public R<List<String>> getDBs(Long datasourceId) throws IOException {
|
||||
return success(datasourceQueryService.getDBs(datasourceId));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 根据数据源id,dbname获取CollectionNames
|
||||
*
|
||||
* @param datasourceId
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/collectionNames")
|
||||
@ApiOperation("根据数据源id,dbname获取CollectionNames")
|
||||
public R<List<String>> getTableNames(Long datasourceId,String dbName) throws IOException {
|
||||
return success(datasourceQueryService.getCollectionNames(datasourceId,dbName));
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据数据源id获取可用表名
|
||||
*
|
||||
|
||||
@ -10,8 +10,8 @@ import java.util.List;
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName DataxJsonDto
|
||||
* @Version 2.0
|
||||
* @since 2020/01/11 17:15
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 07:15
|
||||
*/
|
||||
@Data
|
||||
public class DataxJsonDto implements Serializable {
|
||||
@ -39,4 +39,8 @@ public class DataxJsonDto implements Serializable {
|
||||
private RdbmsReaderDto rdbmsReader;
|
||||
|
||||
private RdbmsWriterDto rdbmsWriter;
|
||||
|
||||
private MongoDBReaderDto mongoDBReaderDto;
|
||||
|
||||
private MongoDBWriterDto mongoDBWriterDto;
|
||||
}
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
package com.wugui.datax.admin.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 构建mongodb reader dto
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName mongodb reader
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 07:15
|
||||
*/
|
||||
@Data
|
||||
public class MongoDBReaderDto implements Serializable {
|
||||
|
||||
private String address;
|
||||
|
||||
private String dbName;
|
||||
|
||||
private String collectionName;
|
||||
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
package com.wugui.datax.admin.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 构建mongodb write dto
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName mongodb write dto
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 07:15
|
||||
*/
|
||||
@Data
|
||||
public class MongoDBWriterDto implements Serializable {
|
||||
|
||||
private String address;
|
||||
|
||||
private String dbName;
|
||||
|
||||
private String collectionName;
|
||||
|
||||
/**
|
||||
* 当设置为true时,表示针对相同的upsertKey做更新操作
|
||||
*/
|
||||
private boolean isUpsert;
|
||||
/**
|
||||
* upsertKey指定了没行记录的业务主键。用来做更新时使用。
|
||||
*/
|
||||
private String upsertKey;
|
||||
}
|
||||
@ -1,10 +1,11 @@
|
||||
package com.wugui.datax.admin.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 数据库查询服务层
|
||||
* 数据库查询服务
|
||||
*
|
||||
* @author zhouhongfa@gz-yibo.com
|
||||
* @ClassName JdbcDatasourceQueryService
|
||||
@ -13,6 +14,13 @@ import java.util.List;
|
||||
*/
|
||||
public interface DatasourceQueryService {
|
||||
|
||||
/**
|
||||
* 获取db列表
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
List<String> getDBs(Long id) throws UnknownHostException;
|
||||
|
||||
/**
|
||||
* 根据数据源表id查询出可用的表
|
||||
*
|
||||
@ -21,6 +29,12 @@ public interface DatasourceQueryService {
|
||||
*/
|
||||
List<String> getTables(Long id) throws IOException;
|
||||
|
||||
/**
|
||||
* 获取CollectionNames
|
||||
* @param dbName
|
||||
* @return
|
||||
*/
|
||||
List<String> getCollectionNames(long id,String dbName) throws UnknownHostException;
|
||||
|
||||
/**
|
||||
* 根据数据源id,表名查询出该表所有字段
|
||||
|
||||
@ -7,16 +7,18 @@ import com.wugui.datax.admin.service.DatasourceQueryService;
|
||||
import com.wugui.datax.admin.service.JobDatasourceService;
|
||||
import com.wugui.datax.admin.tool.query.BaseQueryTool;
|
||||
import com.wugui.datax.admin.tool.query.HBaseQueryTool;
|
||||
import com.wugui.datax.admin.tool.query.MongoDBQueryTool;
|
||||
import com.wugui.datax.admin.tool.query.QueryToolFactory;
|
||||
import com.wugui.datax.admin.util.DataSourceConstants;
|
||||
import com.wugui.datax.admin.util.JdbcConstants;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* TODO
|
||||
* datasource query
|
||||
*
|
||||
* @author zhouhongfa@gz-yibo.com
|
||||
* @ClassName JdbcDatasourceQueryServiceImpl
|
||||
@ -29,22 +31,42 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
|
||||
@Autowired
|
||||
private JobDatasourceService jobDatasourceService;
|
||||
|
||||
@Override
|
||||
public List<String> getDBs(Long id) throws UnknownHostException {
|
||||
//获取数据源对象
|
||||
JobDatasource datasource = jobDatasourceService.getById(id);
|
||||
return MongoDBQueryTool.getInstance(datasource).getDBNames();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<String> getTables(Long id) throws IOException {
|
||||
//获取数据源对象
|
||||
JobDatasource jobDatasource = jobDatasourceService.getById(id);
|
||||
JobDatasource datasource = jobDatasourceService.getById(id);
|
||||
//queryTool组装
|
||||
if (ObjectUtil.isNull(jobDatasource)) {
|
||||
if (ObjectUtil.isNull(datasource)) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
if (DataSourceConstants.HBASE.equals(jobDatasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(jobDatasource).getTableNames();
|
||||
if (JdbcConstants.HBASE.equals(datasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(datasource).getTableNames();
|
||||
} else {
|
||||
BaseQueryTool qTool = QueryToolFactory.getByDbType(jobDatasource);
|
||||
BaseQueryTool qTool = QueryToolFactory.getByDbType(datasource);
|
||||
return qTool.getTableNames();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getCollectionNames(long id,String dbName) throws UnknownHostException {
|
||||
//获取数据源对象
|
||||
JobDatasource datasource = jobDatasourceService.getById(id);
|
||||
//queryTool组装
|
||||
if (ObjectUtil.isNull(datasource)) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
return MongoDBQueryTool.getInstance(datasource).getCollectionNames(dbName);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<String> getColumns(Long id, String tableName) throws IOException {
|
||||
//获取数据源对象
|
||||
@ -53,8 +75,10 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
|
||||
if (ObjectUtil.isNull(datasource)) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
if (DataSourceConstants.HBASE.equals(datasource.getDatasource())) {
|
||||
if (JdbcConstants.HBASE.equals(datasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(datasource).getColumns(tableName);
|
||||
} else if (JdbcConstants.MONGODB.equals(datasource.getDatasource())) {
|
||||
return MongoDBQueryTool.getInstance(datasource).getColumns(tableName);
|
||||
} else {
|
||||
BaseQueryTool queryTool = QueryToolFactory.getByDbType(datasource);
|
||||
return queryTool.getColumnNames(tableName, datasource.getDatasource());
|
||||
|
||||
@ -7,7 +7,7 @@ import com.wugui.datax.admin.service.JobDatasourceService;
|
||||
import com.wugui.datax.admin.tool.query.BaseQueryTool;
|
||||
import com.wugui.datax.admin.tool.query.HBaseQueryTool;
|
||||
import com.wugui.datax.admin.tool.query.QueryToolFactory;
|
||||
import com.wugui.datax.admin.util.DataSourceConstants;
|
||||
import com.wugui.datax.admin.util.JdbcConstants;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@ -26,7 +26,7 @@ public class JobDatasourceServiceImpl extends ServiceImpl<JobJdbcDatasourceMappe
|
||||
|
||||
@Override
|
||||
public Boolean dataSourceTest(JobDatasource jobDatasource) throws IOException {
|
||||
if (DataSourceConstants.HBASE.equals(jobDatasource.getDatasource())) {
|
||||
if (JdbcConstants.HBASE.equals(jobDatasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(jobDatasource).getFamily();
|
||||
}
|
||||
BaseQueryTool queryTool = QueryToolFactory.getByDbType(jobDatasource);
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
package com.wugui.datax.admin.tool.datax;
|
||||
|
||||
import com.alibaba.druid.util.JdbcConstants;
|
||||
import com.alibaba.druid.util.JdbcUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -12,8 +11,10 @@ import com.wugui.datax.admin.tool.datax.reader.*;
|
||||
import com.wugui.datax.admin.tool.datax.writer.*;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHbasePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHivePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo;
|
||||
import com.wugui.datatx.core.util.Constant;
|
||||
import com.wugui.datax.admin.util.JdbcConstants;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@ -23,10 +24,10 @@ import java.util.Map;
|
||||
/**
|
||||
* 构建 com.wugui.datax json的工具类
|
||||
*
|
||||
* @author zhouhongfa@gz-yibo.com
|
||||
* @author jingwk
|
||||
* @ClassName DataxJsonHelper
|
||||
* @Version 1.0
|
||||
* @since 2019/7/30 22:24
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 08:24
|
||||
*/
|
||||
@Data
|
||||
public class DataxJsonHelper implements DataxJsonInterface {
|
||||
@ -78,6 +79,10 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
|
||||
private RdbmsWriterDto rdbmsWriterDto;
|
||||
|
||||
private MongoDBReaderDto mongoDBReaderDto;
|
||||
|
||||
private MongoDBWriterDto mongoDBWriterDto;
|
||||
|
||||
|
||||
//用于保存额外参数
|
||||
private Map<String, Object> extraParams = Maps.newHashMap();
|
||||
@ -91,30 +96,32 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
this.rdbmsReaderDto = dataxJsonDto.getRdbmsReader();
|
||||
this.hbaseReaderDto = dataxJsonDto.getHbaseReader();
|
||||
// reader 插件
|
||||
String readerDbType = JdbcUtils.getDbType(readerDatasource.getJdbcUrl(), readerDatasource.getJdbcDriverClass());
|
||||
if (StringUtils.isEmpty(readerDatasource.getZkAdress())) {
|
||||
if (JdbcConstants.MYSQL.equals(readerDbType)) {
|
||||
readerPlugin = new MysqlReader();
|
||||
} else if (JdbcConstants.ORACLE.equals(readerDbType)) {
|
||||
readerPlugin = new OracleReader();
|
||||
} else if (JdbcConstants.SQL_SERVER.equals(readerDbType)) {
|
||||
readerPlugin = new SqlServerReader();
|
||||
} else if (JdbcConstants.POSTGRESQL.equals(readerDbType)) {
|
||||
readerPlugin = new PostgresqlReader();
|
||||
} else if (JdbcConstants.HIVE.equals(readerDbType)) {
|
||||
readerPlugin = new HiveReader();
|
||||
buildReader = buildHiveReader();
|
||||
}
|
||||
if (!JdbcConstants.HIVE.equals(readerDbType)) {
|
||||
buildReader = this.buildReader();
|
||||
}
|
||||
} else {
|
||||
String datasource = readerDatasource.getDatasource();
|
||||
if (JdbcConstants.MYSQL.equals(datasource)) {
|
||||
readerPlugin = new MysqlReader();
|
||||
buildReader = buildReader();
|
||||
} else if (JdbcConstants.ORACLE.equals(datasource)) {
|
||||
readerPlugin = new OracleReader();
|
||||
buildReader = buildReader();
|
||||
} else if (JdbcConstants.SQL_SERVER.equals(datasource)) {
|
||||
readerPlugin = new SqlServerReader();
|
||||
buildReader = buildReader();
|
||||
} else if (JdbcConstants.POSTGRESQL.equals(datasource)) {
|
||||
readerPlugin = new PostgresqlReader();
|
||||
buildReader = buildReader();
|
||||
} else if (JdbcConstants.HIVE.equals(datasource)) {
|
||||
readerPlugin = new HiveReader();
|
||||
buildReader = buildHiveReader();
|
||||
} else if (JdbcConstants.HBASE.equals(datasource)) {
|
||||
readerPlugin = new HBaseReader();
|
||||
buildReader = buildHBaseReader();
|
||||
} else if (JdbcConstants.MONGODB.equals(datasource)) {
|
||||
readerPlugin = new MongoDBReader();
|
||||
buildReader = buildMongoDBReader();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void initWriter(DataxJsonDto dataxJsonDto, JobDatasource readerDatasource) {
|
||||
this.writerDatasource = readerDatasource;
|
||||
this.writerTables = dataxJsonDto.getWriterTables();
|
||||
@ -122,27 +129,30 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
this.hiveWriterDto = dataxJsonDto.getHiveWriter();
|
||||
this.rdbmsWriterDto = dataxJsonDto.getRdbmsWriter();
|
||||
this.hbaseWriterDto = dataxJsonDto.getHbaseWriter();
|
||||
this.mongoDBWriterDto=dataxJsonDto.getMongoDBWriterDto();
|
||||
// writer
|
||||
String writerDbType = JdbcUtils.getDbType(writerDatasource.getJdbcUrl(), writerDatasource.getJdbcDriverClass());
|
||||
if (StringUtils.isEmpty(readerDatasource.getZkAdress())) {
|
||||
if (JdbcConstants.MYSQL.equals(writerDbType)) {
|
||||
writerPlugin = new MysqlWriter();
|
||||
} else if (JdbcConstants.ORACLE.equals(writerDbType)) {
|
||||
writerPlugin = new OraclelWriter();
|
||||
} else if (JdbcConstants.SQL_SERVER.equals(writerDbType)) {
|
||||
writerPlugin = new SqlServerlWriter();
|
||||
} else if (JdbcConstants.POSTGRESQL.equals(writerDbType)) {
|
||||
writerPlugin = new PostgresqllWriter();
|
||||
} else if (JdbcConstants.HIVE.equals(writerDbType)) {
|
||||
writerPlugin = new HiveWriter();
|
||||
buildWriter = this.buildHiveWriter();
|
||||
}
|
||||
if (!JdbcConstants.HIVE.equals(writerDbType)) {
|
||||
buildWriter = this.buildWriter();
|
||||
}
|
||||
} else {
|
||||
String datasource = readerDatasource.getDatasource();
|
||||
if (JdbcConstants.MYSQL.equals(datasource)) {
|
||||
writerPlugin = new MysqlWriter();
|
||||
buildWriter = this.buildWriter();
|
||||
} else if (JdbcConstants.ORACLE.equals(datasource)) {
|
||||
writerPlugin = new OraclelWriter();
|
||||
buildWriter = this.buildWriter();
|
||||
} else if (JdbcConstants.SQL_SERVER.equals(datasource)) {
|
||||
writerPlugin = new SqlServerlWriter();
|
||||
buildWriter = this.buildWriter();
|
||||
} else if (JdbcConstants.POSTGRESQL.equals(datasource)) {
|
||||
writerPlugin = new PostgresqllWriter();
|
||||
buildWriter = this.buildWriter();
|
||||
} else if (JdbcConstants.HIVE.equals(datasource)) {
|
||||
writerPlugin = new HiveWriter();
|
||||
buildWriter = this.buildHiveWriter();
|
||||
} else if (JdbcConstants.HBASE.equals(datasource)) {
|
||||
writerPlugin = new HBaseWriter();
|
||||
buildWriter = buildHBaseWriter();
|
||||
buildWriter = this.buildHBaseWriter();
|
||||
}else if (JdbcConstants.MONGODB.equals(datasource)) {
|
||||
writerPlugin = new MongoDBWriter();
|
||||
buildWriter = this.buildMongoDBWriter();
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,6 +240,22 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
dataxHbasePojo.setReaderRange(hbaseReaderDto.getReaderRange());
|
||||
return readerPlugin.buildHbase(dataxHbasePojo);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildMongoDBReader() {
|
||||
DataxMongoDBPojo dataxMongoDBPojo = new DataxMongoDBPojo();
|
||||
dataxMongoDBPojo.setJdbcDatasource(readerDatasource);
|
||||
List<Map<String, Object>> columns = Lists.newArrayList();
|
||||
buildColumns(readerColumns, columns);
|
||||
dataxMongoDBPojo.setColumns(columns);
|
||||
dataxMongoDBPojo.setAddress(readerDatasource.getJdbcUrl());
|
||||
dataxMongoDBPojo.setDbName(mongoDBReaderDto.getDbName());
|
||||
dataxMongoDBPojo.setCollectionName(mongoDBReaderDto.getCollectionName());
|
||||
return readerPlugin.buildMongoDB(dataxMongoDBPojo);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildWriter() {
|
||||
DataxRdbmsPojo dataxPluginPojo = new DataxRdbmsPojo();
|
||||
@ -245,12 +271,7 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
DataxHivePojo dataxHivePojo = new DataxHivePojo();
|
||||
dataxHivePojo.setJdbcDatasource(writerDatasource);
|
||||
List<Map<String, Object>> columns = Lists.newArrayList();
|
||||
writerColumns.forEach(c -> {
|
||||
Map<String, Object> column = Maps.newLinkedHashMap();
|
||||
column.put("name", c.split(Constant.SPLIT_SCOLON)[1]);
|
||||
column.put("type", c.split(Constant.SPLIT_SCOLON)[2]);
|
||||
columns.add(column);
|
||||
});
|
||||
buildColumns(writerColumns, columns);
|
||||
dataxHivePojo.setColumns(columns);
|
||||
dataxHivePojo.setWriterDefaultFS(hiveWriterDto.getWriterDefaultFS());
|
||||
dataxHivePojo.setWriteFieldDelimiter(hiveWriterDto.getWriteFieldDelimiter());
|
||||
@ -261,14 +282,23 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
return writerPlugin.buildHive(dataxHivePojo);
|
||||
}
|
||||
|
||||
private void buildColumns(List<String> columns, List<Map<String, Object>> returnColumns) {
|
||||
columns.forEach(c -> {
|
||||
Map<String, Object> column = Maps.newLinkedHashMap();
|
||||
column.put("name", c.split(Constant.SPLIT_SCOLON)[1]);
|
||||
column.put("type", c.split(Constant.SPLIT_SCOLON)[2]);
|
||||
returnColumns.add(column);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildHBaseWriter() {
|
||||
DataxHbasePojo dataxHbasePojo = new DataxHbasePojo();
|
||||
dataxHbasePojo.setJdbcDatasource(writerDatasource);
|
||||
List<Map<String, Object>> columns = Lists.newArrayList();
|
||||
for(int i =0; i<writerColumns.size();i++){
|
||||
for (int i = 0; i < writerColumns.size(); i++) {
|
||||
Map<String, Object> column = Maps.newLinkedHashMap();
|
||||
column.put("index",i);
|
||||
column.put("index", i);
|
||||
column.put("name", writerColumns.get(i));
|
||||
column.put("type", "string");
|
||||
columns.add(column);
|
||||
@ -281,4 +311,20 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
dataxHbasePojo.setWriterMode(hbaseWriterDto.getWriterMode());
|
||||
return writerPlugin.buildHbase(dataxHbasePojo);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildMongoDBWriter() {
|
||||
DataxMongoDBPojo dataxMongoDBPojo = new DataxMongoDBPojo();
|
||||
dataxMongoDBPojo.setJdbcDatasource(writerDatasource);
|
||||
List<Map<String, Object>> columns = Lists.newArrayList();
|
||||
buildColumns(writerColumns, columns);
|
||||
dataxMongoDBPojo.setColumns(columns);
|
||||
dataxMongoDBPojo.setAddress(writerDatasource.getJdbcUrl());
|
||||
dataxMongoDBPojo.setDbName(mongoDBWriterDto.getDbName());
|
||||
dataxMongoDBPojo.setCollectionName(mongoDBWriterDto.getCollectionName());
|
||||
dataxMongoDBPojo.setUpsert(mongoDBWriterDto.isUpsert());
|
||||
dataxMongoDBPojo.setUpsertKey(mongoDBWriterDto.getUpsertKey());
|
||||
return writerPlugin.buildMongoDB(dataxMongoDBPojo);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,10 +5,10 @@ import java.util.Map;
|
||||
/**
|
||||
* 构建 com.wugui.datax json的基础接口
|
||||
*
|
||||
* @author zhouhongfa@gz-yibo.com
|
||||
* @author jingwk
|
||||
* @ClassName DataxJsonHelper
|
||||
* @Version 1.0
|
||||
* @since 2019/7/30 22:24
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 12:24
|
||||
*/
|
||||
public interface DataxJsonInterface {
|
||||
|
||||
@ -28,5 +28,9 @@ public interface DataxJsonInterface {
|
||||
|
||||
Map<String, Object> buildHBaseWriter();
|
||||
|
||||
Map<String, Object> buildMongoDBReader();
|
||||
|
||||
Map<String, Object> buildMongoDBWriter();
|
||||
|
||||
Map<String, Object> buildWriter();
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package com.wugui.datax.admin.tool.datax;
|
||||
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHbasePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHivePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo;
|
||||
|
||||
import java.util.Map;
|
||||
@ -43,6 +44,14 @@ public interface DataxPluginInterface {
|
||||
* @return
|
||||
*/
|
||||
Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo);
|
||||
|
||||
/**
|
||||
* mongodb json构建
|
||||
* @param dataxMongoDBPojo
|
||||
* @return
|
||||
*/
|
||||
Map<String,Object> buildMongoDB(DataxMongoDBPojo dataxMongoDBPojo);
|
||||
|
||||
/**
|
||||
* 获取示例
|
||||
*
|
||||
|
||||
@ -7,6 +7,7 @@ import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import com.wugui.datax.admin.tool.datax.BaseDataxPlugin;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHbasePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHivePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@ -65,4 +66,9 @@ public abstract class BaseReaderPlugin extends BaseDataxPlugin {
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) { return null; }
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildMongoDB(DataxMongoDBPojo dataxMongoDBPojo) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
package com.wugui.datax.admin.tool.datax.reader;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class MongoDBReader extends BaseReaderPlugin implements DataxReaderInterface {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "mongodbreader";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> sample() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Map<String, Object> buildMongoDB(DataxMongoDBPojo plugin) {
|
||||
//构建
|
||||
Map<String, Object> readerObj = Maps.newLinkedHashMap();
|
||||
readerObj.put("name", getName());
|
||||
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
|
||||
parameterObj.put("address", plugin.getAddress());
|
||||
parameterObj.put("userName", plugin.getJdbcDatasource().getJdbcUsername());
|
||||
parameterObj.put("userPassword", plugin.getJdbcDatasource().getJdbcPassword());
|
||||
parameterObj.put("dbName", plugin.getDbName());
|
||||
parameterObj.put("collectionName", plugin.getCollectionName());
|
||||
readerObj.put("parameter", parameterObj);
|
||||
return readerObj;
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,7 @@
|
||||
package com.wugui.datax.admin.tool.datax.reader;
|
||||
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
||||
@ -6,6 +6,7 @@ import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import com.wugui.datax.admin.tool.datax.BaseDataxPlugin;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHbasePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxHivePojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo;
|
||||
|
||||
import java.util.Map;
|
||||
@ -50,5 +51,12 @@ public abstract class BaseWriterPlugin extends BaseDataxPlugin {
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) { return null; }
|
||||
public Map<String, Object> buildHbase(DataxHbasePojo dataxHbasePojo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildMongoDB(DataxMongoDBPojo plugin) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,8 +19,8 @@ public class HBaseWriter extends BaseWriterPlugin implements DataxWriterInterfac
|
||||
|
||||
public Map<String, Object> buildHbase(DataxHbasePojo plugin) {
|
||||
//构建
|
||||
Map<String, Object> readerObj = Maps.newLinkedHashMap();
|
||||
readerObj.put("name", getName());
|
||||
Map<String, Object> writerObj = Maps.newLinkedHashMap();
|
||||
writerObj.put("name", getName());
|
||||
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
|
||||
Map<String, Object> confige = Maps.newLinkedHashMap();
|
||||
confige.put("hbase.zookeeper.quorum", plugin.getWriterHbaseConfig());
|
||||
@ -34,7 +34,7 @@ public class HBaseWriter extends BaseWriterPlugin implements DataxWriterInterfac
|
||||
if (StringUtils.isNotBlank(plugin.getWriterVersionColumn().getValue())) {
|
||||
parameterObj.put("versionColumn", plugin.getWriterVersionColumn());
|
||||
}
|
||||
readerObj.put("parameter", parameterObj);
|
||||
return readerObj;
|
||||
writerObj.put("parameter", parameterObj);
|
||||
return writerObj;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,40 @@
|
||||
package com.wugui.datax.admin.tool.datax.writer;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.wugui.datax.admin.tool.pojo.DataxMongoDBPojo;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class MongoDBWriter extends BaseWriterPlugin implements DataxWriterInterface {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "mongodbwriter";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> sample() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> buildMongoDB(DataxMongoDBPojo plugin) {
|
||||
//构建
|
||||
Map<String, Object> writerObj = Maps.newLinkedHashMap();
|
||||
writerObj.put("name", getName());
|
||||
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
|
||||
parameterObj.put("address", plugin.getAddress());
|
||||
parameterObj.put("userName", plugin.getJdbcDatasource().getJdbcUsername());
|
||||
parameterObj.put("userPassword", plugin.getJdbcDatasource().getJdbcPassword());
|
||||
parameterObj.put("dbName", plugin.getDbName());
|
||||
parameterObj.put("collectionName", plugin.getCollectionName());
|
||||
writerObj.put("parameter", parameterObj);
|
||||
|
||||
Map<String, Object> upsertInfo = Maps.newLinkedHashMap();
|
||||
upsertInfo.put("isUpsert", plugin.isUpsert());
|
||||
parameterObj.put("upsertKey", plugin.getUpsertKey());
|
||||
writerObj.put("upsertInfo", upsertInfo);
|
||||
return writerObj;
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
package com.wugui.datax.admin.tool.datax.writer;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -15,7 +16,6 @@ public class SqlServerlWriter extends BaseWriterPlugin implements DataxWriterInt
|
||||
return "sqlserverwriter";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, Object> sample() {
|
||||
return null;
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.wugui.datax.admin.tool.meta;
|
||||
import com.alibaba.druid.util.JdbcConstants;
|
||||
|
||||
import com.wugui.datax.admin.util.JdbcConstants;
|
||||
|
||||
/**
|
||||
* meta信息工厂
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
package com.wugui.datax.admin.tool.pojo;
|
||||
|
||||
import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 用于传参,构建json
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName DataxMongoDBPojo
|
||||
* @Version 2.0
|
||||
* @since 2020/03/14 11:15
|
||||
*/
|
||||
@Data
|
||||
public class DataxMongoDBPojo {
|
||||
|
||||
/**
|
||||
* hive列名
|
||||
*/
|
||||
private List<Map<String,Object>> columns;
|
||||
|
||||
/**
|
||||
* 数据源信息
|
||||
*/
|
||||
private JobDatasource jdbcDatasource;
|
||||
|
||||
private String address;
|
||||
|
||||
private String dbName;
|
||||
|
||||
private String collectionName;
|
||||
|
||||
/**
|
||||
* 当设置为true时,表示针对相同的upsertKey做更新操作
|
||||
*/
|
||||
private boolean isUpsert;
|
||||
/**
|
||||
* upsertKey指定了没行记录的业务主键。用来做更新时使用。
|
||||
*/
|
||||
private String upsertKey;
|
||||
}
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
package com.wugui.datax.admin.tool.query;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.druid.util.JdbcConstants;
|
||||
import com.alibaba.druid.util.JdbcUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.wugui.datatx.core.util.Constant;
|
||||
@ -13,6 +12,8 @@ import com.wugui.datax.admin.tool.database.DasColumn;
|
||||
import com.wugui.datax.admin.tool.database.TableInfo;
|
||||
import com.wugui.datax.admin.tool.meta.DatabaseInterface;
|
||||
import com.wugui.datax.admin.tool.meta.DatabaseMetaFactory;
|
||||
import com.wugui.datax.admin.util.JdbcConstants;
|
||||
import com.wugui.datax.admin.util.JdbcUtils;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -53,7 +54,6 @@ public abstract class BaseQueryTool implements QueryToolInterface {
|
||||
* @param jobDatasource
|
||||
*/
|
||||
BaseQueryTool(JobDatasource jobDatasource) throws SQLException {
|
||||
String currentDbType = JdbcUtils.getDbType(jobDatasource.getJdbcUrl(), jobDatasource.getJdbcDriverClass());
|
||||
if (LocalCacheUtil.get(jobDatasource.getDatasourceName()) == null) {
|
||||
getDataSource(jobDatasource);
|
||||
} else {
|
||||
@ -63,7 +63,7 @@ public abstract class BaseQueryTool implements QueryToolInterface {
|
||||
getDataSource(jobDatasource);
|
||||
}
|
||||
}
|
||||
sqlBuilder = DatabaseMetaFactory.getByDbType(currentDbType);
|
||||
sqlBuilder = DatabaseMetaFactory.getByDbType(jobDatasource.getDatasource());
|
||||
currentSchema = getSchema(jobDatasource.getJdbcUsername());
|
||||
LocalCacheUtil.set(jobDatasource.getDatasourceName(), this.connection, 4 * 60 * 60 * 1000);
|
||||
}
|
||||
|
||||
@ -1,18 +1,18 @@
|
||||
package com.wugui.datax.admin.tool.query;
|
||||
|
||||
|
||||
import com.mongodb.BasicDBObject;
|
||||
import com.mongodb.Block;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.*;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import com.mongodb.client.MongoIterable;
|
||||
import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.bson.Document;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@ -25,9 +25,15 @@ public class MongoDBQueryTool {
|
||||
|
||||
|
||||
|
||||
MongoDBQueryTool(JobDatasource jobDatasource) {
|
||||
MongoDBQueryTool(JobDatasource jobDatasource) throws UnknownHostException {
|
||||
if (mongoClient == null){
|
||||
mongoClient = new MongoClient(new MongoClientURI(jobDatasource.getJdbcUrl()));
|
||||
if(StringUtils.isBlank(jobDatasource.getJdbcUsername()) && StringUtils.isBlank(jobDatasource.getJdbcPassword())){
|
||||
mongoClient = new MongoClient(jobDatasource.getJdbcUrl());
|
||||
}else{
|
||||
MongoCredential credential = MongoCredential.createCredential(jobDatasource.getJdbcUsername(), null, jobDatasource.getJdbcPassword().toCharArray());
|
||||
mongoClient = new MongoClient(parseServerAddress(jobDatasource.getJdbcUrl()), Arrays.asList(credential));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,7 +41,7 @@ public class MongoDBQueryTool {
|
||||
* 获得该类的实例,单例模式
|
||||
* @return
|
||||
*/
|
||||
public static MongoDBQueryTool getInstance(JobDatasource jobDatasource) throws IOException {
|
||||
public static MongoDBQueryTool getInstance(JobDatasource jobDatasource) throws UnknownHostException {
|
||||
if (instance == null) {
|
||||
synchronized(MongoDBQueryTool.class){
|
||||
if(instance == null){
|
||||
@ -98,4 +104,39 @@ public class MongoDBQueryTool {
|
||||
});
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断地址类型是否符合要求
|
||||
* @param addressList
|
||||
* @return
|
||||
*/
|
||||
private static boolean isHostPortPattern(List<Object> addressList) {
|
||||
for(Object address : addressList) {
|
||||
String regex = "(\\S+):([0-9]+)";
|
||||
if(!((String)address).matches(regex)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为mongo地址协议
|
||||
* @param rawAddress
|
||||
* @return
|
||||
*/
|
||||
private static List<ServerAddress> parseServerAddress(String rawAddress) throws UnknownHostException {
|
||||
List<ServerAddress> addressList = new ArrayList<>();
|
||||
for(String address : Arrays.asList(rawAddress.split(","))) {
|
||||
String[] tempAddress = address.split(":");
|
||||
try {
|
||||
ServerAddress sa = new ServerAddress(tempAddress[0],Integer.valueOf(tempAddress[1]));
|
||||
addressList.add(sa);
|
||||
} catch (Exception e) {
|
||||
throw new UnknownHostException();
|
||||
}
|
||||
}
|
||||
return addressList;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
package com.wugui.datax.admin.tool.query;
|
||||
|
||||
import com.alibaba.druid.util.JdbcConstants;
|
||||
import com.alibaba.druid.util.JdbcUtils;
|
||||
import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import com.wugui.datax.admin.util.JdbcConstants;
|
||||
import com.wugui.datax.admin.util.RdbmsException;
|
||||
|
||||
import java.sql.SQLException;
|
||||
@ -17,21 +16,21 @@ import java.sql.SQLException;
|
||||
*/
|
||||
public class QueryToolFactory {
|
||||
|
||||
public static final BaseQueryTool getByDbType(JobDatasource jobJdbcDatasource) {
|
||||
public static final BaseQueryTool getByDbType(JobDatasource jobDatasource) {
|
||||
//获取dbType
|
||||
String dbType = JdbcUtils.getDbType(jobJdbcDatasource.getJdbcUrl(), jobJdbcDatasource.getJdbcDriverClass());
|
||||
if (JdbcConstants.MYSQL.equals(dbType)) {
|
||||
return getMySQLQueryToolInstance(jobJdbcDatasource);
|
||||
} else if (JdbcConstants.ORACLE.equals(dbType)) {
|
||||
return getOracleQueryToolInstance(jobJdbcDatasource);
|
||||
} else if (JdbcConstants.POSTGRESQL.equals(dbType)) {
|
||||
return getPostgresqlQueryToolInstance(jobJdbcDatasource);
|
||||
} else if (JdbcConstants.SQL_SERVER.equals(dbType)) {
|
||||
return getSqlserverQueryToolInstance(jobJdbcDatasource);
|
||||
}else if (JdbcConstants.HIVE.equals(dbType)) {
|
||||
return getHiveQueryToolInstance(jobJdbcDatasource);
|
||||
String datasource = jobDatasource.getDatasource();
|
||||
if (JdbcConstants.MYSQL.equals(datasource)) {
|
||||
return getMySQLQueryToolInstance(jobDatasource);
|
||||
} else if (JdbcConstants.ORACLE.equals(datasource)) {
|
||||
return getOracleQueryToolInstance(jobDatasource);
|
||||
} else if (JdbcConstants.POSTGRESQL.equals(datasource)) {
|
||||
return getPostgresqlQueryToolInstance(jobDatasource);
|
||||
} else if (JdbcConstants.SQL_SERVER.equals(datasource)) {
|
||||
return getSqlserverQueryToolInstance(jobDatasource);
|
||||
}else if (JdbcConstants.HIVE.equals(datasource)) {
|
||||
return getHiveQueryToolInstance(jobDatasource);
|
||||
}
|
||||
throw new UnsupportedOperationException("找不到该类型: ".concat(dbType));
|
||||
throw new UnsupportedOperationException("找不到该类型: ".concat(datasource));
|
||||
}
|
||||
|
||||
private static BaseQueryTool getMySQLQueryToolInstance(JobDatasource jdbcDatasource) {
|
||||
|
||||
@ -1,10 +0,0 @@
|
||||
package com.wugui.datax.admin.util;
|
||||
|
||||
import com.alibaba.druid.util.JdbcConstants;
|
||||
|
||||
public interface DataSourceConstants extends JdbcConstants {
|
||||
|
||||
String HBASE = "hbase";
|
||||
|
||||
String HBASE_ZK_QUORUM = "hbase.zookeeper.quorum";
|
||||
}
|
||||
@ -0,0 +1,119 @@
|
||||
package com.wugui.datax.admin.util;
|
||||
|
||||
|
||||
/**
|
||||
* JdbcConstants
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName JdbcConstants
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 07:15
|
||||
*/
|
||||
public interface JdbcConstants {
|
||||
|
||||
|
||||
String HBASE_ZK_QUORUM = "hbase.zookeeper.quorum";
|
||||
|
||||
String MONGODB ="mongodb";
|
||||
|
||||
String JTDS = "jtds";
|
||||
|
||||
String MOCK = "mock";
|
||||
|
||||
String HSQL = "hsql";
|
||||
|
||||
String DB2 = "db2";
|
||||
|
||||
String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver";
|
||||
|
||||
String POSTGRESQL = "postgresql";
|
||||
String POSTGRESQL_DRIVER = "org.postgresql.Driver";
|
||||
|
||||
String SYBASE = "sybase";
|
||||
|
||||
String SQL_SERVER = "sqlserver";
|
||||
String SQL_SERVER_DRIVER = "com.microsoft.jdbc.sqlserver.SQLServerDriver";
|
||||
String SQL_SERVER_DRIVER_SQLJDBC4 = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
|
||||
String SQL_SERVER_DRIVER_JTDS = "net.sourceforge.jtds.jdbc.Driver";
|
||||
|
||||
String ORACLE = "oracle";
|
||||
String ORACLE_DRIVER = "oracle.jdbc.OracleDriver";
|
||||
String ORACLE_DRIVER2 = "oracle.jdbc.driver.OracleDriver";
|
||||
|
||||
String ALI_ORACLE = "AliOracle";
|
||||
String ALI_ORACLE_DRIVER = "com.alibaba.jdbc.AlibabaDriver";
|
||||
|
||||
String MYSQL = "mysql";
|
||||
String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
|
||||
String MYSQL_DRIVER_6 = "com.mysql.cj.jdbc.Driver";
|
||||
String MYSQL_DRIVER_REPLICATE = "com.mysql.jdbc.";
|
||||
|
||||
String MARIADB = "mariadb";
|
||||
String MARIADB_DRIVER = "org.mariadb.jdbc.Driver";
|
||||
|
||||
String DERBY = "derby";
|
||||
|
||||
String HBASE = "hbase";
|
||||
|
||||
String HIVE = "hive";
|
||||
String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
|
||||
|
||||
String H2 = "h2";
|
||||
String H2_DRIVER = "org.h2.Driver";
|
||||
|
||||
String DM = "dm";
|
||||
String DM_DRIVER = "dm.jdbc.driver.DmDriver";
|
||||
|
||||
String KINGBASE = "kingbase";
|
||||
String KINGBASE_DRIVER = "com.kingbase.Driver";
|
||||
|
||||
String GBASE = "gbase";
|
||||
String GBASE_DRIVER = "com.gbase.jdbc.Driver";
|
||||
|
||||
String XUGU = "xugu";
|
||||
String XUGU_DRIVER = "com.xugu.cloudjdbc.Driver";
|
||||
|
||||
String OCEANBASE = "oceanbase";
|
||||
String OCEANBASE_DRIVER = "com.mysql.jdbc.Driver";
|
||||
String INFORMIX = "informix";
|
||||
|
||||
/**
|
||||
* 阿里云odps
|
||||
*/
|
||||
String ODPS = "odps";
|
||||
String ODPS_DRIVER = "com.aliyun.odps.jdbc.OdpsDriver";
|
||||
|
||||
String TERADATA = "teradata";
|
||||
String TERADATA_DRIVER = "com.teradata.jdbc.TeraDriver";
|
||||
|
||||
/**
|
||||
* Log4JDBC
|
||||
*/
|
||||
String LOG4JDBC = "log4jdbc";
|
||||
String LOG4JDBC_DRIVER = "net.sf.log4jdbc.DriverSpy";
|
||||
|
||||
String PHOENIX = "phoenix";
|
||||
String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
|
||||
String ENTERPRISEDB = "edb";
|
||||
String ENTERPRISEDB_DRIVER = "com.edb.Driver";
|
||||
|
||||
String KYLIN = "kylin";
|
||||
String KYLIN_DRIVER = "org.apache.kylin.jdbc.Driver";
|
||||
|
||||
|
||||
String SQLITE = "sqlite";
|
||||
String SQLITE_DRIVER = "org.sqlite.JDBC";
|
||||
|
||||
String ALIYUN_ADS = "aliyun_ads";
|
||||
String ALIYUN_DRDS = "aliyun_drds";
|
||||
|
||||
String PRESTO = "presto";
|
||||
String PRESTO_DRIVER = "com.facebook.presto.jdbc.PrestoDriver";
|
||||
|
||||
String ELASTIC_SEARCH = "elastic_search";
|
||||
|
||||
String ELASTIC_SEARCH_DRIVER = "com.alibaba.xdriver.elastic.jdbc.ElasticDriver";
|
||||
|
||||
String CLICKHOUSE = "clickhouse";
|
||||
String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
|
||||
}
|
||||
@ -0,0 +1,683 @@
|
||||
package com.wugui.datax.admin.util;
|
||||
|
||||
|
||||
import org.apache.xerces.impl.dv.util.HexBin;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.io.Closeable;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.sql.Date;
|
||||
import java.sql.Blob;
|
||||
import java.sql.Clob;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
/**
|
||||
* jdbc utils
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName JdbcUtils
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 07:15
|
||||
*/
|
||||
public final class JdbcUtils implements JdbcConstants {
|
||||
|
||||
private static Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
|
||||
|
||||
|
||||
private static final Properties DRIVER_URL_MAPPING = new Properties();
|
||||
|
||||
private static Boolean mysql_driver_version_6 = null;
|
||||
|
||||
static {
|
||||
try {
|
||||
ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (ctxClassLoader != null) {
|
||||
for (Enumeration<URL> e = ctxClassLoader.getResources("META-INF/druid-driver.properties"); e.hasMoreElements();) {
|
||||
URL url = e.nextElement();
|
||||
|
||||
Properties property = new Properties();
|
||||
|
||||
InputStream is = null;
|
||||
try {
|
||||
is = url.openStream();
|
||||
property.load(is);
|
||||
} finally {
|
||||
JdbcUtils.close(is);
|
||||
}
|
||||
|
||||
DRIVER_URL_MAPPING.putAll(property);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("load druid-driver.properties error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void close(Connection x) {
|
||||
if (x == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
x.close();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("close connection error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void close(Statement x) {
|
||||
if (x == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
x.close();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("close statement error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void close(ResultSet x) {
|
||||
if (x == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
x.close();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("close result set error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void close(Closeable x) {
|
||||
if (x == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
x.close();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("close error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void close(Blob x) {
|
||||
if (x == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
x.free();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("close error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void close(Clob x) {
|
||||
if (x == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
x.free();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("close error", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void printResultSet(ResultSet rs) throws SQLException {
|
||||
printResultSet(rs, System.out);
|
||||
}
|
||||
|
||||
public static void printResultSet(ResultSet rs, PrintStream out) throws SQLException {
|
||||
printResultSet(rs, out, true, "\t");
|
||||
}
|
||||
|
||||
public static void printResultSet(ResultSet rs, PrintStream out, boolean printHeader, String seperator) throws SQLException {
|
||||
ResultSetMetaData metadata = rs.getMetaData();
|
||||
int columnCount = metadata.getColumnCount();
|
||||
if (printHeader) {
|
||||
for (int columnIndex = 1; columnIndex <= columnCount; ++columnIndex) {
|
||||
if (columnIndex != 1) {
|
||||
out.print(seperator);
|
||||
}
|
||||
out.print(metadata.getColumnName(columnIndex));
|
||||
}
|
||||
}
|
||||
|
||||
out.println();
|
||||
|
||||
while (rs.next()) {
|
||||
|
||||
for (int columnIndex = 1; columnIndex <= columnCount; ++columnIndex) {
|
||||
if (columnIndex != 1) {
|
||||
out.print(seperator);
|
||||
}
|
||||
|
||||
int type = metadata.getColumnType(columnIndex);
|
||||
|
||||
if (type == Types.VARCHAR || type == Types.CHAR || type == Types.NVARCHAR || type == Types.NCHAR) {
|
||||
out.print(rs.getString(columnIndex));
|
||||
} else if (type == Types.DATE) {
|
||||
Date date = rs.getDate(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(date.toString());
|
||||
}
|
||||
} else if (type == Types.BIT) {
|
||||
boolean value = rs.getBoolean(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(Boolean.toString(value));
|
||||
}
|
||||
} else if (type == Types.BOOLEAN) {
|
||||
boolean value = rs.getBoolean(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(Boolean.toString(value));
|
||||
}
|
||||
} else if (type == Types.TINYINT) {
|
||||
byte value = rs.getByte(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(Byte.toString(value));
|
||||
}
|
||||
} else if (type == Types.SMALLINT) {
|
||||
short value = rs.getShort(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(Short.toString(value));
|
||||
}
|
||||
} else if (type == Types.INTEGER) {
|
||||
int value = rs.getInt(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(Integer.toString(value));
|
||||
}
|
||||
} else if (type == Types.BIGINT) {
|
||||
long value = rs.getLong(columnIndex);
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(Long.toString(value));
|
||||
}
|
||||
} else if (type == Types.TIMESTAMP) {
|
||||
out.print(String.valueOf(rs.getTimestamp(columnIndex)));
|
||||
} else if (type == Types.DECIMAL) {
|
||||
out.print(String.valueOf(rs.getBigDecimal(columnIndex)));
|
||||
} else if (type == Types.CLOB) {
|
||||
out.print(String.valueOf(rs.getString(columnIndex)));
|
||||
} else if (type == Types.JAVA_OBJECT) {
|
||||
Object object = rs.getObject(columnIndex);
|
||||
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(String.valueOf(object));
|
||||
}
|
||||
} else if (type == Types.LONGVARCHAR) {
|
||||
Object object = rs.getString(columnIndex);
|
||||
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
out.print(String.valueOf(object));
|
||||
}
|
||||
} else if (type == Types.NULL) {
|
||||
out.print("null");
|
||||
} else {
|
||||
Object object = rs.getObject(columnIndex);
|
||||
|
||||
if (rs.wasNull()) {
|
||||
out.print("null");
|
||||
} else {
|
||||
if (object instanceof byte[]) {
|
||||
byte[] bytes = (byte[]) object;
|
||||
String text = HexBin.encode(bytes);
|
||||
out.print(text);
|
||||
} else {
|
||||
out.print(String.valueOf(object));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
out.println();
|
||||
}
|
||||
}
|
||||
|
||||
public static String getTypeName(int sqlType) {
|
||||
switch (sqlType) {
|
||||
case Types.ARRAY:
|
||||
return "ARRAY";
|
||||
|
||||
case Types.BIGINT:
|
||||
return "BIGINT";
|
||||
|
||||
case Types.BINARY:
|
||||
return "BINARY";
|
||||
|
||||
case Types.BIT:
|
||||
return "BIT";
|
||||
|
||||
case Types.BLOB:
|
||||
return "BLOB";
|
||||
|
||||
case Types.BOOLEAN:
|
||||
return "BOOLEAN";
|
||||
|
||||
case Types.CHAR:
|
||||
return "CHAR";
|
||||
|
||||
case Types.CLOB:
|
||||
return "CLOB";
|
||||
|
||||
case Types.DATALINK:
|
||||
return "DATALINK";
|
||||
|
||||
case Types.DATE:
|
||||
return "DATE";
|
||||
|
||||
case Types.DECIMAL:
|
||||
return "DECIMAL";
|
||||
|
||||
case Types.DISTINCT:
|
||||
return "DISTINCT";
|
||||
|
||||
case Types.DOUBLE:
|
||||
return "DOUBLE";
|
||||
|
||||
case Types.FLOAT:
|
||||
return "FLOAT";
|
||||
|
||||
case Types.INTEGER:
|
||||
return "INTEGER";
|
||||
|
||||
case Types.JAVA_OBJECT:
|
||||
return "JAVA_OBJECT";
|
||||
|
||||
case Types.LONGNVARCHAR:
|
||||
return "LONGNVARCHAR";
|
||||
|
||||
case Types.LONGVARBINARY:
|
||||
return "LONGVARBINARY";
|
||||
|
||||
case Types.NCHAR:
|
||||
return "NCHAR";
|
||||
|
||||
case Types.NCLOB:
|
||||
return "NCLOB";
|
||||
|
||||
case Types.NULL:
|
||||
return "NULL";
|
||||
|
||||
case Types.NUMERIC:
|
||||
return "NUMERIC";
|
||||
|
||||
case Types.NVARCHAR:
|
||||
return "NVARCHAR";
|
||||
|
||||
case Types.REAL:
|
||||
return "REAL";
|
||||
|
||||
case Types.REF:
|
||||
return "REF";
|
||||
|
||||
case Types.ROWID:
|
||||
return "ROWID";
|
||||
|
||||
case Types.SMALLINT:
|
||||
return "SMALLINT";
|
||||
|
||||
case Types.SQLXML:
|
||||
return "SQLXML";
|
||||
|
||||
case Types.STRUCT:
|
||||
return "STRUCT";
|
||||
|
||||
case Types.TIME:
|
||||
return "TIME";
|
||||
|
||||
case Types.TIMESTAMP:
|
||||
return "TIMESTAMP";
|
||||
|
||||
case Types.TINYINT:
|
||||
return "TINYINT";
|
||||
|
||||
case Types.VARBINARY:
|
||||
return "VARBINARY";
|
||||
|
||||
case Types.VARCHAR:
|
||||
return "VARCHAR";
|
||||
|
||||
default:
|
||||
return "OTHER";
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static String getDbType(String rawUrl, String driverClassName) {
|
||||
if (rawUrl == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (rawUrl.startsWith("jdbc:derby:") || rawUrl.startsWith("jdbc:log4jdbc:derby:")) {
|
||||
return DERBY;
|
||||
} else if (rawUrl.startsWith("jdbc:mysql:") || rawUrl.startsWith("jdbc:cobar:")
|
||||
|| rawUrl.startsWith("jdbc:log4jdbc:mysql:")) {
|
||||
return MYSQL;
|
||||
} else if (rawUrl.startsWith("jdbc:mariadb:")) {
|
||||
return MARIADB;
|
||||
} else if (rawUrl.startsWith("jdbc:oracle:") || rawUrl.startsWith("jdbc:log4jdbc:oracle:")) {
|
||||
return ORACLE;
|
||||
} else if (rawUrl.startsWith("jdbc:alibaba:oracle:")) {
|
||||
return ALI_ORACLE;
|
||||
} else if (rawUrl.startsWith("jdbc:microsoft:") || rawUrl.startsWith("jdbc:log4jdbc:microsoft:")) {
|
||||
return SQL_SERVER;
|
||||
} else if (rawUrl.startsWith("jdbc:sqlserver:") || rawUrl.startsWith("jdbc:log4jdbc:sqlserver:")) {
|
||||
return SQL_SERVER;
|
||||
} else if (rawUrl.startsWith("jdbc:sybase:Tds:") || rawUrl.startsWith("jdbc:log4jdbc:sybase:")) {
|
||||
return SYBASE;
|
||||
} else if (rawUrl.startsWith("jdbc:jtds:") || rawUrl.startsWith("jdbc:log4jdbc:jtds:")) {
|
||||
return JTDS;
|
||||
} else if (rawUrl.startsWith("jdbc:fake:") || rawUrl.startsWith("jdbc:mock:")) {
|
||||
return MOCK;
|
||||
} else if (rawUrl.startsWith("jdbc:postgresql:") || rawUrl.startsWith("jdbc:log4jdbc:postgresql:")) {
|
||||
return POSTGRESQL;
|
||||
} else if (rawUrl.startsWith("jdbc:edb:")) {
|
||||
return ENTERPRISEDB;
|
||||
} else if (rawUrl.startsWith("jdbc:hsqldb:") || rawUrl.startsWith("jdbc:log4jdbc:hsqldb:")) {
|
||||
return HSQL;
|
||||
} else if (rawUrl.startsWith("jdbc:odps:")) {
|
||||
return ODPS;
|
||||
} else if (rawUrl.startsWith("jdbc:db2:")) {
|
||||
return DB2;
|
||||
} else if (rawUrl.startsWith("jdbc:sqlite:")) {
|
||||
return SQLITE;
|
||||
} else if (rawUrl.startsWith("jdbc:ingres:")) {
|
||||
return "ingres";
|
||||
} else if (rawUrl.startsWith("jdbc:h2:") || rawUrl.startsWith("jdbc:log4jdbc:h2:")) {
|
||||
return H2;
|
||||
} else if (rawUrl.startsWith("jdbc:mckoi:")) {
|
||||
return "mckoi";
|
||||
} else if (rawUrl.startsWith("jdbc:cloudscape:")) {
|
||||
return "cloudscape";
|
||||
} else if (rawUrl.startsWith("jdbc:informix-sqli:") || rawUrl.startsWith("jdbc:log4jdbc:informix-sqli:")) {
|
||||
return "informix";
|
||||
} else if (rawUrl.startsWith("jdbc:timesten:")) {
|
||||
return "timesten";
|
||||
} else if (rawUrl.startsWith("jdbc:as400:")) {
|
||||
return "as400";
|
||||
} else if (rawUrl.startsWith("jdbc:sapdb:")) {
|
||||
return "sapdb";
|
||||
} else if (rawUrl.startsWith("jdbc:JSQLConnect:")) {
|
||||
return "JSQLConnect";
|
||||
} else if (rawUrl.startsWith("jdbc:JTurbo:")) {
|
||||
return "JTurbo";
|
||||
} else if (rawUrl.startsWith("jdbc:firebirdsql:")) {
|
||||
return "firebirdsql";
|
||||
} else if (rawUrl.startsWith("jdbc:interbase:")) {
|
||||
return "interbase";
|
||||
} else if (rawUrl.startsWith("jdbc:pointbase:")) {
|
||||
return "pointbase";
|
||||
} else if (rawUrl.startsWith("jdbc:edbc:")) {
|
||||
return "edbc";
|
||||
} else if (rawUrl.startsWith("jdbc:mimer:multi1:")) {
|
||||
return "mimer";
|
||||
} else if (rawUrl.startsWith("jdbc:dm:")) {
|
||||
return JdbcConstants.DM;
|
||||
} else if (rawUrl.startsWith("jdbc:kingbase:")) {
|
||||
return JdbcConstants.KINGBASE;
|
||||
} else if (rawUrl.startsWith("jdbc:gbase:")) {
|
||||
return JdbcConstants.GBASE;
|
||||
} else if (rawUrl.startsWith("jdbc:xugu:")) {
|
||||
return JdbcConstants.XUGU;
|
||||
} else if (rawUrl.startsWith("jdbc:log4jdbc:")) {
|
||||
return LOG4JDBC;
|
||||
} else if (rawUrl.startsWith("jdbc:hive:")) {
|
||||
return HIVE;
|
||||
} else if (rawUrl.startsWith("jdbc:hive2:")) {
|
||||
return HIVE;
|
||||
} else if (rawUrl.startsWith("jdbc:phoenix:")) {
|
||||
return PHOENIX;
|
||||
} else if (rawUrl.startsWith("jdbc:elastic:")) {
|
||||
return ELASTIC_SEARCH;
|
||||
} else if (rawUrl.startsWith("jdbc:clickhouse:")) {
|
||||
return CLICKHOUSE;
|
||||
}else if (rawUrl.startsWith("jdbc:presto:")) {
|
||||
return PRESTO;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Driver createDriver(String driverClassName) throws SQLException {
|
||||
return createDriver(null, driverClassName);
|
||||
}
|
||||
|
||||
public static Driver createDriver(ClassLoader classLoader, String driverClassName) throws SQLException {
|
||||
Class<?> clazz = null;
|
||||
if (classLoader != null) {
|
||||
try {
|
||||
clazz = classLoader.loadClass(driverClassName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
// skip
|
||||
}
|
||||
}
|
||||
|
||||
if (clazz == null) {
|
||||
try {
|
||||
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (contextLoader != null) {
|
||||
clazz = contextLoader.loadClass(driverClassName);
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
// skip
|
||||
}
|
||||
}
|
||||
|
||||
if (clazz == null) {
|
||||
try {
|
||||
clazz = Class.forName(driverClassName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new SQLException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return (Driver) clazz.newInstance();
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new SQLException(e.getMessage(), e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new SQLException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static int executeUpdate(DataSource dataSource, String sql, Object... parameters) throws SQLException {
|
||||
return executeUpdate(dataSource, sql, Arrays.asList(parameters));
|
||||
}
|
||||
|
||||
public static int executeUpdate(DataSource dataSource, String sql, List<Object> parameters) throws SQLException {
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = dataSource.getConnection();
|
||||
return executeUpdate(conn, sql, parameters);
|
||||
} finally {
|
||||
close(conn);
|
||||
}
|
||||
}
|
||||
|
||||
public static int executeUpdate(Connection conn, String sql, List<Object> parameters) throws SQLException {
|
||||
PreparedStatement stmt = null;
|
||||
|
||||
int updateCount;
|
||||
try {
|
||||
stmt = conn.prepareStatement(sql);
|
||||
|
||||
setParameters(stmt, parameters);
|
||||
|
||||
updateCount = stmt.executeUpdate();
|
||||
} finally {
|
||||
JdbcUtils.close(stmt);
|
||||
}
|
||||
|
||||
return updateCount;
|
||||
}
|
||||
|
||||
public static void execute(DataSource dataSource, String sql, Object... parameters) throws SQLException {
|
||||
execute(dataSource, sql, Arrays.asList(parameters));
|
||||
}
|
||||
|
||||
public static void execute(DataSource dataSource, String sql, List<Object> parameters) throws SQLException {
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = dataSource.getConnection();
|
||||
execute(conn, sql, parameters);
|
||||
} finally {
|
||||
close(conn);
|
||||
}
|
||||
}
|
||||
|
||||
public static void execute(Connection conn, String sql) throws SQLException {
|
||||
execute(conn, sql, Collections.emptyList());
|
||||
}
|
||||
|
||||
public static void execute(Connection conn, String sql, List<Object> parameters) throws SQLException {
|
||||
PreparedStatement stmt = null;
|
||||
|
||||
try {
|
||||
stmt = conn.prepareStatement(sql);
|
||||
|
||||
setParameters(stmt, parameters);
|
||||
|
||||
stmt.executeUpdate();
|
||||
} finally {
|
||||
JdbcUtils.close(stmt);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Map<String, Object>> executeQuery(DataSource dataSource, String sql, Object... parameters)
|
||||
throws SQLException {
|
||||
return executeQuery(dataSource, sql, Arrays.asList(parameters));
|
||||
}
|
||||
|
||||
public static List<Map<String, Object>> executeQuery(DataSource dataSource, String sql, List<Object> parameters)
|
||||
throws SQLException {
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = dataSource.getConnection();
|
||||
return executeQuery(conn, sql, parameters);
|
||||
} finally {
|
||||
close(conn);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Map<String, Object>> executeQuery(Connection conn, String sql, List<Object> parameters)
|
||||
throws SQLException {
|
||||
List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
|
||||
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
stmt = conn.prepareStatement(sql);
|
||||
|
||||
setParameters(stmt, parameters);
|
||||
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
ResultSetMetaData rsMeta = rs.getMetaData();
|
||||
|
||||
while (rs.next()) {
|
||||
Map<String, Object> row = new LinkedHashMap<String, Object>();
|
||||
|
||||
for (int i = 0, size = rsMeta.getColumnCount(); i < size; ++i) {
|
||||
String columName = rsMeta.getColumnLabel(i + 1);
|
||||
Object value = rs.getObject(i + 1);
|
||||
row.put(columName, value);
|
||||
}
|
||||
|
||||
rows.add(row);
|
||||
}
|
||||
} finally {
|
||||
JdbcUtils.close(rs);
|
||||
JdbcUtils.close(stmt);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
private static void setParameters(PreparedStatement stmt, List<Object> parameters) throws SQLException {
|
||||
for (int i = 0, size = parameters.size(); i < size; ++i) {
|
||||
Object param = parameters.get(i);
|
||||
stmt.setObject(i + 1, param);
|
||||
}
|
||||
}
|
||||
|
||||
public static void insertToTable(DataSource dataSource, String tableName, Map<String, Object> data)
|
||||
throws SQLException {
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = dataSource.getConnection();
|
||||
insertToTable(conn, tableName, data);
|
||||
} finally {
|
||||
close(conn);
|
||||
}
|
||||
}
|
||||
|
||||
public static void insertToTable(Connection conn, String tableName, Map<String, Object> data) throws SQLException {
|
||||
String sql = makeInsertToTableSql(tableName, data.keySet());
|
||||
List<Object> parameters = new ArrayList<Object>(data.values());
|
||||
execute(conn, sql, parameters);
|
||||
}
|
||||
|
||||
public static String makeInsertToTableSql(String tableName, Collection<String> names) {
|
||||
StringBuilder sql = new StringBuilder() //
|
||||
.append("insert into ") //
|
||||
.append(tableName) //
|
||||
.append("("); //
|
||||
|
||||
int nameCount = 0;
|
||||
for (String name : names) {
|
||||
if (nameCount > 0) {
|
||||
sql.append(",");
|
||||
}
|
||||
sql.append(name);
|
||||
nameCount++;
|
||||
}
|
||||
sql.append(") values (");
|
||||
for (int i = 0; i < nameCount; ++i) {
|
||||
if (i != 0) {
|
||||
sql.append(",");
|
||||
}
|
||||
sql.append("?");
|
||||
}
|
||||
sql.append(")");
|
||||
|
||||
return sql.toString();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -1,10 +1,14 @@
|
||||
package com.wugui.datax.admin.util;
|
||||
|
||||
import com.alibaba.druid.util.JdbcConstants;
|
||||
import com.wugui.datatx.core.util.Constant;
|
||||
|
||||
/**
|
||||
* Created by judy.lt on 2015/6/5.
|
||||
* RdbmsException
|
||||
*
|
||||
* @author jingwk
|
||||
* @ClassName RdbmsException
|
||||
* @Version 2.1.1
|
||||
* @since 2020/03/14 07:15
|
||||
*/
|
||||
public class RdbmsException extends DataXException{
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
<configuration debug="false" scan="true" scanPeriod="1 seconds">
|
||||
|
||||
<contextName>logback</contextName>
|
||||
<property name="log.path" value="/data/applogs/admin/datax-admin.log"/>
|
||||
<property name="log.path" value="./data/applogs/admin/datax-admin.log"/>
|
||||
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user