mirror of
https://github.com/WeiYe-Jing/datax-web.git
synced 2026-06-30 21:17:37 +08:00
Merge branch 'v2.1.1' into dev
This commit is contained in:
commit
b56f4ac32b
@ -1,7 +1,6 @@
|
||||
package com.wugui.datax.admin.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -19,7 +18,7 @@ public interface DatasourceQueryService {
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
List<String> getDBs(Long id) throws UnknownHostException;
|
||||
List<String> getDBs(Long id) throws IOException;
|
||||
|
||||
/**
|
||||
* 根据数据源表id查询出可用的表
|
||||
@ -34,7 +33,7 @@ public interface DatasourceQueryService {
|
||||
* @param dbName
|
||||
* @return
|
||||
*/
|
||||
List<String> getCollectionNames(long id,String dbName) throws UnknownHostException;
|
||||
List<String> getCollectionNames(long id,String dbName) throws IOException;
|
||||
|
||||
/**
|
||||
* 根据数据源id,表名查询出该表所有字段
|
||||
|
||||
@ -14,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -32,10 +31,10 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
|
||||
private JobDatasourceService jobDatasourceService;
|
||||
|
||||
@Override
|
||||
public List<String> getDBs(Long id) throws UnknownHostException {
|
||||
public List<String> getDBs(Long id) throws IOException {
|
||||
//获取数据源对象
|
||||
JobDatasource datasource = jobDatasourceService.getById(id);
|
||||
return MongoDBQueryTool.getInstance(datasource).getDBNames();
|
||||
return new MongoDBQueryTool(datasource).getDBNames();
|
||||
}
|
||||
|
||||
|
||||
@ -48,9 +47,9 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
if (JdbcConstants.HBASE.equals(datasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(datasource).getTableNames();
|
||||
return new HBaseQueryTool(datasource).getTableNames();
|
||||
} else if(JdbcConstants.MONGODB.equals(datasource.getDatasource())){
|
||||
return MongoDBQueryTool.getInstance(datasource).getCollectionNames(datasource.getDatabaseName());
|
||||
return new MongoDBQueryTool(datasource).getCollectionNames(datasource.getDatabaseName());
|
||||
} else {
|
||||
BaseQueryTool qTool = QueryToolFactory.getByDbType(datasource);
|
||||
return qTool.getTableNames();
|
||||
@ -58,14 +57,14 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getCollectionNames(long id,String dbName) throws UnknownHostException {
|
||||
public List<String> getCollectionNames(long id,String dbName) throws IOException {
|
||||
//获取数据源对象
|
||||
JobDatasource datasource = jobDatasourceService.getById(id);
|
||||
//queryTool组装
|
||||
if (ObjectUtil.isNull(datasource)) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
return MongoDBQueryTool.getInstance(datasource).getCollectionNames(dbName);
|
||||
return new MongoDBQueryTool(datasource).getCollectionNames(dbName);
|
||||
}
|
||||
|
||||
|
||||
@ -78,9 +77,9 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
if (JdbcConstants.HBASE.equals(datasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(datasource).getColumns(tableName);
|
||||
return new HBaseQueryTool(datasource).getColumns(tableName);
|
||||
} else if (JdbcConstants.MONGODB.equals(datasource.getDatasource())) {
|
||||
return MongoDBQueryTool.getInstance(datasource).getColumns(tableName);
|
||||
return new MongoDBQueryTool(datasource).getColumns(tableName);
|
||||
} else {
|
||||
BaseQueryTool queryTool = QueryToolFactory.getByDbType(datasource);
|
||||
return queryTool.getColumnNames(tableName, datasource.getDatasource());
|
||||
|
||||
@ -29,7 +29,7 @@ public class JobDatasourceServiceImpl extends ServiceImpl<JobDatasourceMapper, J
|
||||
@Override
|
||||
public Boolean dataSourceTest(JobDatasource jobDatasource) throws IOException {
|
||||
if (JdbcConstants.HBASE.equals(jobDatasource.getDatasource())) {
|
||||
return HBaseQueryTool.getInstance(jobDatasource).dataSourceTest();
|
||||
return new HBaseQueryTool(jobDatasource).dataSourceTest();
|
||||
}
|
||||
String userName = AESUtil.decrypt(jobDatasource.getJdbcUsername());
|
||||
// 判断账密是否为密文
|
||||
@ -41,7 +41,7 @@ public class JobDatasourceServiceImpl extends ServiceImpl<JobDatasourceMapper, J
|
||||
jobDatasource.setJdbcPassword(AESUtil.encrypt(jobDatasource.getJdbcPassword()));
|
||||
}
|
||||
if (JdbcConstants.MONGODB.equals(jobDatasource.getDatasource())) {
|
||||
return MongoDBQueryTool.getInstance(jobDatasource).dataSourceTest(jobDatasource.getDatabaseName());
|
||||
return new MongoDBQueryTool(jobDatasource).dataSourceTest(jobDatasource.getDatabaseName());
|
||||
}
|
||||
BaseQueryTool queryTool = QueryToolFactory.getByDbType(jobDatasource);
|
||||
return queryTool.dataSourceTest();
|
||||
|
||||
@ -294,7 +294,7 @@ public class DataxJsonHelper implements DataxJsonInterface {
|
||||
List<Map<String, Object>> columns = Lists.newArrayList();
|
||||
for (int i = 0; i < writerColumns.size(); i++) {
|
||||
Map<String, Object> column = Maps.newLinkedHashMap();
|
||||
column.put("index", i);
|
||||
column.put("index", i+1);
|
||||
column.put("name", writerColumns.get(i));
|
||||
column.put("type", "string");
|
||||
columns.add(column);
|
||||
|
||||
@ -2,6 +2,7 @@ package com.wugui.datax.admin.tool.query;
|
||||
|
||||
|
||||
import com.wugui.datatx.core.util.Constants;
|
||||
import com.wugui.datax.admin.core.util.LocalCacheUtil;
|
||||
import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
@ -17,107 +18,101 @@ import java.util.concurrent.Executors;
|
||||
|
||||
public class HBaseQueryTool {
|
||||
|
||||
private static Configuration conf = HBaseConfiguration.create();
|
||||
private static ExecutorService pool = Executors.newScheduledThreadPool(2);
|
||||
private static Connection connection = null;
|
||||
private static HBaseQueryTool instance = null;
|
||||
private static Admin admin;
|
||||
private static Table table;
|
||||
private Configuration conf = HBaseConfiguration.create();
|
||||
private ExecutorService pool = Executors.newScheduledThreadPool(2);
|
||||
private Connection connection = null;
|
||||
private Admin admin;
|
||||
private Table table;
|
||||
|
||||
|
||||
HBaseQueryTool(JobDatasource jobDatasource) throws IOException {
|
||||
if (connection == null|| connection.isClosed()){
|
||||
String[] zkAdress=jobDatasource.getZkAdress().split(Constants.SPLIT_SCOLON);
|
||||
conf.set("hbase.zookeeper.quorum",zkAdress[0]);
|
||||
conf.set("hbase.zookeeper.property.clientPort", zkAdress[1]);
|
||||
connection = ConnectionFactory.createConnection(conf,pool);
|
||||
admin = connection.getAdmin();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得该类的实例,单例模式
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static HBaseQueryTool getInstance(JobDatasource jobDatasource) throws IOException {
|
||||
if (instance == null) {
|
||||
synchronized(HBaseQueryTool.class){
|
||||
if(instance == null){
|
||||
instance = new HBaseQueryTool(jobDatasource);
|
||||
public HBaseQueryTool(JobDatasource jobDatasource) throws IOException {
|
||||
if (LocalCacheUtil.get(jobDatasource.getDatasourceName()) == null) {
|
||||
getDataSource(jobDatasource);
|
||||
} else {
|
||||
connection = (Connection) LocalCacheUtil.get(jobDatasource.getDatasourceName());
|
||||
if (connection == null || connection.isClosed()) {
|
||||
LocalCacheUtil.remove(jobDatasource.getDatasourceName());
|
||||
getDataSource(jobDatasource);
|
||||
}
|
||||
}
|
||||
}
|
||||
LocalCacheUtil.set(jobDatasource.getDatasourceName(), connection, 4 * 60 * 60 * 1000);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
public static void sourceClose() {
|
||||
try {
|
||||
if (admin != null) {
|
||||
admin.close();
|
||||
}
|
||||
if (null != connection) {
|
||||
connection.close();
|
||||
}
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
private void getDataSource(JobDatasource jobDatasource) throws IOException {
|
||||
String[] zkAdress = jobDatasource.getZkAdress().split(Constants.SPLIT_SCOLON);
|
||||
conf.set("hbase.zookeeper.quorum", zkAdress[0]);
|
||||
conf.set("hbase.zookeeper.property.clientPort", zkAdress[1]);
|
||||
connection = ConnectionFactory.createConnection(conf, pool);
|
||||
admin = connection.getAdmin();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 测试是否连接成功
|
||||
*
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean dataSourceTest() throws IOException {
|
||||
Admin admin =connection.getAdmin();
|
||||
HTableDescriptor[] tableDescriptor = admin.listTables();
|
||||
return tableDescriptor.length > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取HBase表名称
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<String> getTableNames() throws IOException {
|
||||
List<String> list = new ArrayList<>();
|
||||
Admin admin = connection.getAdmin();
|
||||
TableName[] names=admin.listTableNames();
|
||||
for (int i = 0; i < names.length; i++) {
|
||||
list.add(names[i].getNameAsString());
|
||||
// 关闭连接
|
||||
public void sourceClose() {
|
||||
try {
|
||||
if (admin != null) {
|
||||
admin.close();
|
||||
}
|
||||
if (null != connection) {
|
||||
connection.close();
|
||||
}
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* 通过表名查询所有l列祖和列
|
||||
*
|
||||
* @param tableName
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<String> getColumns(String tableName) throws IOException {
|
||||
List<String> list = new ArrayList<>();
|
||||
table = connection.getTable(TableName.valueOf(tableName));
|
||||
Scan scan = new Scan();
|
||||
//Filter filter = new PageFilter(1);
|
||||
//scan.setFilter(filter);
|
||||
scan.getStartRow();
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Iterator<Result> it = scanner.iterator();
|
||||
if(it.hasNext()) {
|
||||
Result re = it.next();
|
||||
List<Cell> listCells = re.listCells();
|
||||
for (Cell cell : listCells) {
|
||||
list.add(new String(CellUtil.cloneFamily(cell))+":"+new String(CellUtil.cloneQualifier(cell)));
|
||||
}
|
||||
/**
|
||||
* 测试是否连接成功
|
||||
*
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean dataSourceTest() throws IOException {
|
||||
Admin admin = connection.getAdmin();
|
||||
HTableDescriptor[] tableDescriptor = admin.listTables();
|
||||
return tableDescriptor.length > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取HBase表名称
|
||||
*
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<String> getTableNames() throws IOException {
|
||||
List<String> list = new ArrayList<>();
|
||||
Admin admin = connection.getAdmin();
|
||||
TableName[] names = admin.listTableNames();
|
||||
for (int i = 0; i < names.length; i++) {
|
||||
list.add(names[i].getNameAsString());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过表名查询所有l列祖和列
|
||||
*
|
||||
* @param tableName
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<String> getColumns(String tableName) throws IOException {
|
||||
List<String> list = new ArrayList<>();
|
||||
table = connection.getTable(TableName.valueOf(tableName));
|
||||
Scan scan = new Scan();
|
||||
//Filter filter = new PageFilter(1);
|
||||
//scan.setFilter(filter);
|
||||
scan.getStartRow();
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Iterator<Result> it = scanner.iterator();
|
||||
if (it.hasNext()) {
|
||||
Result re = it.next();
|
||||
List<Cell> listCells = re.listCells();
|
||||
for (Cell cell : listCells) {
|
||||
list.add(new String(CellUtil.cloneFamily(cell)) + ":" + new String(CellUtil.cloneQualifier(cell)));
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,10 +5,12 @@ import com.mongodb.*;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import com.mongodb.client.MongoIterable;
|
||||
import com.wugui.datax.admin.core.util.LocalCacheUtil;
|
||||
import com.wugui.datax.admin.entity.JobDatasource;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.bson.Document;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -18,43 +20,37 @@ import java.util.List;
|
||||
public class MongoDBQueryTool {
|
||||
|
||||
|
||||
private static MongoClient mongoClient = null;
|
||||
private static MongoDBQueryTool instance = null;
|
||||
private static MongoClient connection = null;
|
||||
private static MongoDatabase collections;
|
||||
|
||||
|
||||
MongoDBQueryTool(JobDatasource jobDatasource) throws UnknownHostException {
|
||||
if (mongoClient == null) {
|
||||
if (StringUtils.isBlank(jobDatasource.getJdbcUsername()) && StringUtils.isBlank(jobDatasource.getJdbcPassword())) {
|
||||
mongoClient = new MongoClient(jobDatasource.getJdbcUrl());
|
||||
} else {
|
||||
MongoCredential credential = MongoCredential.createCredential(jobDatasource.getJdbcUsername(), jobDatasource.getDatabaseName(), jobDatasource.getJdbcPassword().toCharArray());
|
||||
mongoClient = new MongoClient(parseServerAddress(jobDatasource.getJdbcUrl()), Arrays.asList(credential));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得该类的实例,单例模式
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static MongoDBQueryTool getInstance(JobDatasource jobDatasource) throws UnknownHostException {
|
||||
if (instance == null) {
|
||||
synchronized (MongoDBQueryTool.class) {
|
||||
if (instance == null) {
|
||||
instance = new MongoDBQueryTool(jobDatasource);
|
||||
}
|
||||
public MongoDBQueryTool(JobDatasource jobDatasource) throws IOException {
|
||||
if (LocalCacheUtil.get(jobDatasource.getDatasourceName()) == null) {
|
||||
getDataSource(jobDatasource);
|
||||
} else {
|
||||
connection = (MongoClient) LocalCacheUtil.get(jobDatasource.getDatasourceName());
|
||||
if (connection == null) {
|
||||
LocalCacheUtil.remove(jobDatasource.getDatasourceName());
|
||||
getDataSource(jobDatasource);
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
LocalCacheUtil.set(jobDatasource.getDatasourceName(), connection, 4 * 60 * 60 * 1000);
|
||||
}
|
||||
|
||||
private void getDataSource(JobDatasource jobDatasource) throws IOException {
|
||||
if (StringUtils.isBlank(jobDatasource.getJdbcUsername()) && StringUtils.isBlank(jobDatasource.getJdbcPassword())) {
|
||||
connection = new MongoClient(jobDatasource.getJdbcUrl());
|
||||
} else {
|
||||
MongoCredential credential = MongoCredential.createCredential(jobDatasource.getJdbcUsername(), jobDatasource.getDatabaseName(), jobDatasource.getJdbcPassword().toCharArray());
|
||||
connection = new MongoClient(parseServerAddress(jobDatasource.getJdbcUrl()), Arrays.asList(credential));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 关闭连接
|
||||
public static void sourceClose() {
|
||||
if (mongoClient != null) {
|
||||
mongoClient.close();
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,7 +60,7 @@ public class MongoDBQueryTool {
|
||||
* @return
|
||||
*/
|
||||
public List<String> getDBNames() {
|
||||
MongoIterable<String> dbs = mongoClient.listDatabaseNames();
|
||||
MongoIterable<String> dbs = connection.listDatabaseNames();
|
||||
List<String> dbNames = new ArrayList<>();
|
||||
dbs.forEach((Block<? super String>) dbNames::add);
|
||||
return dbNames;
|
||||
@ -76,7 +72,7 @@ public class MongoDBQueryTool {
|
||||
* @return
|
||||
*/
|
||||
public boolean dataSourceTest(String dbName) {
|
||||
collections = mongoClient.getDatabase(dbName);
|
||||
collections = connection.getDatabase(dbName);
|
||||
return collections.listCollectionNames().iterator().hasNext();
|
||||
}
|
||||
|
||||
@ -86,7 +82,7 @@ public class MongoDBQueryTool {
|
||||
* @return
|
||||
*/
|
||||
public List<String> getCollectionNames(String dbName) {
|
||||
collections = mongoClient.getDatabase(dbName);
|
||||
collections = connection.getDatabase(dbName);
|
||||
List<String> collectionNames = new ArrayList<>();
|
||||
collections.listCollectionNames().forEach((Block<? super String>) collectionNames::add);
|
||||
return collectionNames;
|
||||
|
||||
@ -57,7 +57,7 @@ public class AESUtil {
|
||||
return new Base64().encodeToString(result); // 加密
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("content encrypt error {}",e.getMessage());
|
||||
log.warn("content encrypt error {}",e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -81,7 +81,7 @@ public class AESUtil {
|
||||
return new String(result); // 解密
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("content decrypt error {}",e.getMessage());
|
||||
log.warn("content decrypt error {}",e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -234,8 +234,6 @@ CREATE TABLE `job_user` (
|
||||
-- ----------------------------
|
||||
INSERT INTO `job_user` VALUES (1, 'admin', '$2a$10$2KCqRbra0Yn2TwvkZxtfLuWuUP5KyCWsljO/ci5pLD27pqR3TV1vy', 'ROLE_ADMIN', NULL);
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
||||
Loading…
Reference in New Issue
Block a user