1、处理任务构建,选择不同schema时,无法获取表字段的问题

2、构建任务JSON时若选择了schema,表名也需拼接schema
This commit is contained in:
jiangyang 2021-12-03 17:18:09 +08:00
parent dacb231f5a
commit c84e1c19f6
26 changed files with 87 additions and 45 deletions

View File

@ -144,7 +144,8 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>${lombok.version}</version>
<scope>compile</scope>
</dependency>
<dependency>

View File

@ -73,7 +73,7 @@ public class MetadataController extends BaseController {
*/
@GetMapping("/getTables")
@ApiOperation("根据数据源id获取可用表名")
public R<List<String>> getTableNames(Long datasourceId,String tableSchema) throws IOException {
public R<List<String>> getTableNames(Long datasourceId, String tableSchema) throws IOException {
return success(datasourceQueryService.getTables(datasourceId,tableSchema));
}
@ -82,12 +82,13 @@ public class MetadataController extends BaseController {
*
* @param datasourceId 数据源id
* @param tableName 表名
* @param tableSchema 模式名
* @return
*/
@GetMapping("/getColumns")
@ApiOperation("根据数据源id和表名获取所有字段")
public R<List<String>> getColumns(Long datasourceId, String tableName) throws IOException {
return success(datasourceQueryService.getColumns(datasourceId, tableName));
public R<List<String>> getColumns(Long datasourceId, String tableName, String tableSchema) throws IOException {
return success(datasourceQueryService.getColumns(datasourceId, tableName, tableSchema));
}
/**

View File

@ -18,10 +18,14 @@ public class DataXBatchJsonBuildDTO implements Serializable {
private Long readerDatasourceId;
private String readerTableSchema;
private List<String> readerTables;
private Long writerDatasourceId;
private String writerTableSchema;
private List<String> writerTables;
private int templateId;

View File

@ -18,6 +18,8 @@ public class DataXJsonBuildDTO implements Serializable {
private Long readerDatasourceId;
private String readerTableSchema;
private List<String> readerTables;
private List<String> readerColumns;
@ -26,6 +28,8 @@ public class DataXJsonBuildDTO implements Serializable {
private Long writerDatasourceId;
private String writerTableSchema;
private List<String> writerTables;
private List<String> writerColumns;

View File

@ -53,10 +53,11 @@ public interface DatasourceQueryService {
* 根据数据源id表名查询出该表所有字段
* @param id
* @param tableName
* @param tableSchema
* @return
* @throws IOException
*/
List<String> getColumns(Long id, String tableName) throws IOException;
List<String> getColumns(Long id, String tableName, String tableSchema) throws IOException;
/**

View File

@ -95,7 +95,7 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
@Override
public List<String> getColumns(Long id, String tableName) throws IOException {
public List<String> getColumns(Long id, String tableName, String tableSchema) throws IOException {
//获取数据源对象
JobDatasource datasource = jobDatasourceService.getById(id);
//queryTool组装
@ -108,7 +108,7 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
return new MongoDBQueryTool(datasource.getType(),datasource.getConnectionParams()).getColumns(tableName);
} else {
BaseQueryTool queryTool = QueryToolFactory.getByDbType(datasource.getType(),datasource.getConnectionParams());
return queryTool.getColumnNames(tableName, datasource.getType());
return queryTool.getColumnNames(tableName, tableSchema, datasource.getType());
}
}

View File

@ -30,21 +30,24 @@ public class DataxJsonServiceImpl implements DataxJsonService {
@Override
public String buildJobJson(DataXJsonBuildDTO dataXJsonBuildDto) {
DataXJsonHelper dataxJsonHelper = new DataXJsonHelper();
// reader
JobDatasource readerDatasource = jobDataSourceService.getById(dataXJsonBuildDto.getReaderDatasourceId());
dataxJsonHelper.initTransformer(dataXJsonBuildDto);
//处理reader表名
processingTableName(readerDatasource, dataXJsonBuildDto.getReaderTables());
//处理reader模式
processingTableSchema(dataXJsonBuildDto.getReaderTableSchema(), dataXJsonBuildDto.getReaderTables());
// reader plugin init
dataxJsonHelper.initReader(dataXJsonBuildDto, readerDatasource);
JobDatasource writerDatasource = jobDataSourceService.getById(dataXJsonBuildDto.getWriterDatasourceId());
//处理writer表名
processingTableName(writerDatasource, dataXJsonBuildDto.getWriterTables());
//处理writer模式
processingTableSchema(dataXJsonBuildDto.getWriterTableSchema(), dataXJsonBuildDto.getWriterTables());
dataxJsonHelper.initWriter(dataXJsonBuildDto, writerDatasource);
return JSON.toJSONString(dataxJsonHelper.buildJob());
@ -54,6 +57,7 @@ public class DataxJsonServiceImpl implements DataxJsonService {
* 处理表名称
* 解决生成json中的表名称大小写敏感问题
* 目前针对Oracle和postgreSQL
*
* @param jobDatasource
* @param tables
*/
@ -65,5 +69,20 @@ public class DataxJsonServiceImpl implements DataxJsonService {
}
}
/**
* 处理表模式名
*
* @param tableSchema
* @param tables
* @return
* @author Locki
* @date 2021/12/3
*/
private void processingTableSchema(String tableSchema, List<String> tables) {
if (tableSchema != null && tableSchema.trim().length() > 0) {
for (int i = 0; i < tables.size(); i++) {
tables.set(i, tableSchema.trim() + "." + tables.get(i));
}
}
}
}

View File

@ -436,12 +436,15 @@ public class JobServiceImpl implements JobService {
List<String> rColumns;
List<String> wColumns;
for (int i = 0; i < rdTables.size(); i++) {
rColumns = datasourceQueryService.getColumns(dto.getReaderDatasourceId(), rdTables.get(i));
wColumns = datasourceQueryService.getColumns(dto.getWriterDatasourceId(), wrTables.get(i));
rColumns = datasourceQueryService.getColumns(dto.getReaderDatasourceId(), rdTables.get(i), dto.getReaderTableSchema());
wColumns = datasourceQueryService.getColumns(dto.getWriterDatasourceId(), wrTables.get(i), dto.getWriterTableSchema());
jsonBuild.setReaderDatasourceId(dto.getReaderDatasourceId());
jsonBuild.setWriterDatasourceId(dto.getWriterDatasourceId());
jsonBuild.setReaderTableSchema(dto.getReaderTableSchema());
jsonBuild.setWriterTableSchema(dto.getWriterTableSchema());
jsonBuild.setReaderColumns(rColumns);
jsonBuild.setWriterColumns(wColumns);

View File

@ -11,8 +11,13 @@ package com.wugui.datax.admin.tool.meta;
public abstract class BaseDatabaseMeta implements DatabaseInterface {
@Override
public String getSQLQueryFields(String tableName) {
return "SELECT * FROM " + tableName + " where 1=0";
public String getSQLQueryFields(String tableSchema, String tableName) {
if (tableSchema == null || tableSchema.trim().length() < 1) {
tableSchema = "";
} else {
tableSchema = tableSchema.trim() + ".";
}
return "SELECT * FROM " + tableSchema + tableName + " where 1=0";
}
@Override

View File

@ -5,10 +5,11 @@ public interface DatabaseInterface {
/**
* Returns the minimal SQL to launch in order to determine the layout of the resultset for a given com.com.wugui.datax.admin.tool.database table
*
* @param tableSchema the schema of the table
* @param tableName The name of the table to determine the layout for
* @return The SQL to launch.
*/
String getSQLQueryFields(String tableName);
String getSQLQueryFields(String tableSchema, String tableName);
/**
* 获取主键字段

View File

@ -83,7 +83,7 @@ public abstract class BaseQueryTool implements QueryToolInterface {
}
@Override
public TableInfo buildTableInfo(final String tableName) {
public TableInfo buildTableInfo(String tableSchema, String tableName) {
//获取表信息
List<Map<String, Object>> tableInfos = this.getTableInfo(tableName);
if (tableInfos.isEmpty()) {
@ -98,7 +98,7 @@ public abstract class BaseQueryTool implements QueryToolInterface {
tableInfo.setComment(StrUtil.toString(tValues.get(1)));
//获取所有字段
List<ColumnInfo> fullColumn = getColumns(tableName);
List<ColumnInfo> fullColumn = getColumns(tableSchema, tableName);
tableInfo.setColumns(fullColumn);
//获取主键列
@ -148,10 +148,10 @@ public abstract class BaseQueryTool implements QueryToolInterface {
}
@Override
public List<ColumnInfo> getColumns(final String tableName) {
public List<ColumnInfo> getColumns(String tableSchema, String tableName) {
List<ColumnInfo> fullColumn = Lists.newArrayList();
//获取查询指定表所有字段的sql语句
String querySql = sqlBuilder.getSQLQueryFields(tableName);
String querySql = sqlBuilder.getSQLQueryFields(tableSchema, tableName);
logger.info("querySql: {}", querySql);
//获取指定表的所有字段
try (Statement statement = connection.createStatement();
@ -255,7 +255,7 @@ public abstract class BaseQueryTool implements QueryToolInterface {
}
@Override
public List<String> getColumnNames(String tableName, final DbType dbType) {
public List<String> getColumnNames(String tableName, String tableSchema, final DbType dbType) {
List<String> res = Lists.newArrayList();
//处理表名
@ -263,7 +263,7 @@ public abstract class BaseQueryTool implements QueryToolInterface {
tableName = TableNameHandle.addDoubleQuotes(tableName);
}
//获取查询指定表所有字段的sql语句
String querySql = sqlBuilder.getSQLQueryFields(tableName);
String querySql = sqlBuilder.getSQLQueryFields(tableSchema, tableName);
logger.info("querySql: {}", querySql);
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(querySql);

View File

@ -19,10 +19,11 @@ public interface QueryToolInterface {
/**
* 构建 tableInfo对象
*
* @param tableName 表名
* @param tableSchema 模式名
* @param tableName 表名
* @return
*/
TableInfo buildTableInfo(String tableName);
TableInfo buildTableInfo(String tableSchema, String tableName);
/**
* 获取指定表信息
@ -41,19 +42,22 @@ public interface QueryToolInterface {
/**
* 根据表名获取所有字段
*
* @param tableName
* @param tableSchema 模式名
* @param tableName 表名
* @return2
*/
List<ColumnInfo> getColumns(String tableName);
List<ColumnInfo> getColumns(String tableSchema, String tableName);
/**
* 根据表名和获取所有字段名称不包括表名
*
* @param tableName
* @param tableSchema
* @param dbType
* @return2
*/
List<String> getColumnNames(String tableName, DbType dbType);
List<String> getColumnNames(String tableName, String tableSchema, DbType dbType);
/**
@ -72,6 +76,7 @@ public interface QueryToolInterface {
/**
* 通过查询sql获取columns
*
* @param querySql
* @return
*/
@ -79,10 +84,11 @@ public interface QueryToolInterface {
/**
* 获取当前表maxId
*
* @param tableName
* @param primaryKey
* @return
*/
long getMaxIdVal(String tableName,String primaryKey);
long getMaxIdVal(String tableName, String primaryKey);
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -41,7 +41,7 @@ public class Hbase20xsqlQueryToolTest {
@Test
public void getColumnNames() {
List<String> columns = queryTool.getColumnNames("STOCK_SYMBOL", null);
List<String> columns = queryTool.getColumnNames("STOCK_SYMBOL", null, null);
for (String column : columns) {
System.out.println(column);

View File

@ -38,9 +38,9 @@ public class MySQLQueryToolTest {
@Test
public void getColumnNames() {
List<String> columns = queryTool.getColumnNames("job_config", jobDatasource.getType());
List<String> columns = queryTool.getColumnNames("job_config", null, jobDatasource.getType());
log.info(columns.toString());
columns = queryTool.getColumnNames("job_log", jobDatasource.getType());
columns = queryTool.getColumnNames("job_log", null, jobDatasource.getType());
log.info(columns.toString());
}

View File

@ -37,7 +37,7 @@ public class OracleQueryToolTest {
@Test
public void getColumnNames() {
List<String> columns = queryTool.getColumnNames("EMP",jdbcDatasource.getType());
List<String> columns = queryTool.getColumnNames("EMP", null, jdbcDatasource.getType());
log.info(columns.toString());
}
}

View File

@ -35,7 +35,7 @@ public class PostgresqlQueryToolTest {
@Test
public void getTableColumns() {
List<String> tableNames = queryTool.getColumnNames("BD_EMR_TYPE",jdbcDatasource.getType());
List<String> tableNames = queryTool.getColumnNames("BD_EMR_TYPE", null, jdbcDatasource.getType());
tableNames.forEach(System.out::println);
}

View File

@ -36,7 +36,7 @@ public class SqlServerQueryToolTest {
@Test
public void getColumnNames() {
List<String> columns = queryTool.getColumnNames("BD_EMR_TYPE",jdbcDatasource.getType());
List<String> columns = queryTool.getColumnNames("BD_EMR_TYPE", null, jdbcDatasource.getType());
log.info(columns.toString());
}
}

View File

@ -88,12 +88,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<version>${lombok.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -58,6 +58,8 @@
<druid.version>1.2.3</druid.version>
<db2jcc.version>db2jcc4</db2jcc.version>
<lombok.version>1.18.22</lombok.version>
</properties>
</project>