hbase json构建完成

This commit is contained in:
景文凯 2020-03-04 09:36:19 +08:00
parent c004eb8680
commit 3ecfb431a9
12 changed files with 82 additions and 78 deletions

View File

@ -16,6 +16,7 @@ import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@ -155,7 +156,7 @@ public class JobDatasourceController extends ApiController {
*/
@PostMapping("/test")
@ApiOperation("测试数据")
public R<Boolean> dataSourceTest (@RequestBody JobDatasource jobJdbcDatasource) {
public R<Boolean> dataSourceTest (@RequestBody JobDatasource jobJdbcDatasource) throws IOException {
return success(jobJdbcDatasourceService.dataSourceTest(jobJdbcDatasource));
}
}

View File

@ -3,16 +3,11 @@ package com.wugui.datax.admin.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class HbaseReaderDto implements Serializable {
private String readerHbaseConfig;
private List<String> readerTable;
private String readerEncoding;
private String readerMaxVersion;
private String readerMode;

View File

@ -3,20 +3,15 @@ package com.wugui.datax.admin.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class HbaseWriterDto implements Serializable {
private String writerHbaseConfig;
private List<String> writerTable;
private String writerEncoding;
private String writeNullMode;
private String writerMode;
private RowkeyColumn writerRowkeyColumn;
private VersionColumn writervVersionColumn;
private VersionColumn writerVersionColumn;
}

View File

@ -6,7 +6,10 @@ import java.io.Serializable;
@Data
public class Range implements Serializable {
private String startRowkey;
private String endRowkey;
private boolean isBinaryRowkey=true;
private boolean isBinaryRowkey;
}

View File

@ -6,7 +6,10 @@ import java.io.Serializable;
@Data
public class RowkeyColumn implements Serializable {
private Integer index;
private String type;
private String value;
}

View File

@ -4,6 +4,8 @@ import lombok.Data;
@Data
public class VersionColumn {
private Integer index;
private String value;
}

View File

@ -3,6 +3,8 @@ package com.wugui.datax.admin.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.wugui.datax.admin.entity.JobDatasource;
import java.io.IOException;
/**
* jdbc数据源配置表服务接口
*
@ -16,5 +18,5 @@ public interface JobDatasourceService extends IService<JobDatasource> {
* @param jdbcDatasource
* @return
*/
Boolean dataSourceTest(JobDatasource jdbcDatasource);
Boolean dataSourceTest(JobDatasource jdbcDatasource) throws IOException;
}

View File

@ -5,10 +5,14 @@ import com.wugui.datax.admin.mapper.JobJdbcDatasourceMapper;
import com.wugui.datax.admin.entity.JobDatasource;
import com.wugui.datax.admin.service.JobDatasourceService;
import com.wugui.datax.admin.tool.query.BaseQueryTool;
import com.wugui.datax.admin.tool.query.HBaseQueryTool;
import com.wugui.datax.admin.tool.query.QueryToolFactory;
import com.wugui.datax.admin.util.DataSourceConstants;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
/**
* jdbc数据源配置表服务实现类
*
@ -21,8 +25,11 @@ import org.springframework.transaction.annotation.Transactional;
public class JobDatasourceServiceImpl extends ServiceImpl<JobJdbcDatasourceMapper, JobDatasource> implements JobDatasourceService {
@Override
public Boolean dataSourceTest(JobDatasource jdbcDatasource) {
BaseQueryTool queryTool = QueryToolFactory.getByDbType(jdbcDatasource);
public Boolean dataSourceTest(JobDatasource jobDatasource) throws IOException {
if (DataSourceConstants.HBASE.equals(jobDatasource.getDatasource())) {
return HBaseQueryTool.getInstance(jobDatasource).getFamily();
}
BaseQueryTool queryTool = QueryToolFactory.getByDbType(jobDatasource);
return queryTool.dataSourceTest();
}

View File

@ -212,6 +212,24 @@ public class DataxJsonHelper implements DataxJsonInterface {
return readerPlugin.buildHive(dataxHivePojo);
}
@Override
public Map<String, Object> buildHBaseReader() {
DataxHbasePojo dataxHbasePojo = new DataxHbasePojo();
dataxHbasePojo.setJdbcDatasource(readerDatasource);
List<Map<String, Object>> columns = Lists.newArrayList();
for (int i = 0; i < readerColumns.size(); i++) {
Map<String, Object> column = Maps.newLinkedHashMap();
column.put("name", readerColumns.get(i));
column.put("type", "string");
columns.add(column);
}
dataxHbasePojo.setColumns(columns);
dataxHbasePojo.setReaderHbaseConfig(readerDatasource.getZkAdress());
dataxHbasePojo.setReaderTable(readerTables);
dataxHbasePojo.setReaderMode(hbaseReaderDto.getReaderMode());
dataxHbasePojo.setReaderRange(hbaseReaderDto.getReaderRange());
return readerPlugin.buildHbase(dataxHbasePojo);
}
@Override
public Map<String, Object> buildWriter() {
DataxRdbmsPojo dataxPluginPojo = new DataxRdbmsPojo();
@ -243,26 +261,6 @@ public class DataxJsonHelper implements DataxJsonInterface {
return writerPlugin.buildHive(dataxHivePojo);
}
@Override
public Map<String, Object> buildHBaseReader() {
DataxHbasePojo dataxHbasePojo = new DataxHbasePojo();
dataxHbasePojo.setJdbcDatasource(readerDatasource);
List<Map<String, Object>> columns = Lists.newArrayList();
for (int i = 0; i < readerColumns.size(); i++) {
Map<String, Object> column = Maps.newLinkedHashMap();
column.put("name", readerColumns.get(i));
column.put("type", "string");
columns.add(column);
}
dataxHbasePojo.setColumns(columns);
dataxHbasePojo.setReaderHbaseConfig(hbaseReaderDto.getReaderHbaseConfig());
dataxHbasePojo.setReaderTable(hbaseReaderDto.getReaderTable());
dataxHbasePojo.setReaderEncoding(hbaseReaderDto.getReaderEncoding());
dataxHbasePojo.setReaderMode(hbaseReaderDto.getReaderMode());
dataxHbasePojo.setReaderRange(hbaseReaderDto.getReaderRange());
return readerPlugin.buildHbase(dataxHbasePojo);
}
@Override
public Map<String, Object> buildHBaseWriter() {
DataxHbasePojo dataxHbasePojo = new DataxHbasePojo();
@ -276,10 +274,9 @@ public class DataxJsonHelper implements DataxJsonInterface {
columns.add(column);
}
dataxHbasePojo.setColumns(columns);
dataxHbasePojo.setWriterHbaseConfig(hbaseWriterDto.getWriterHbaseConfig());
dataxHbasePojo.setWriterTable(hbaseWriterDto.getWriterTable());
dataxHbasePojo.setWriterEncoding(hbaseWriterDto.getWriterEncoding());
dataxHbasePojo.setWriterVersionColumn(hbaseWriterDto.getWritervVersionColumn());
dataxHbasePojo.setWriterHbaseConfig(writerDatasource.getZkAdress());
dataxHbasePojo.setWriterTable(readerTables);
dataxHbasePojo.setWriterVersionColumn(hbaseWriterDto.getWriterVersionColumn());
dataxHbasePojo.setWriterRowkeyColumn(hbaseWriterDto.getWriterRowkeyColumn());
dataxHbasePojo.setWriterMode(hbaseWriterDto.getWriterMode());
return writerPlugin.buildHbase(dataxHbasePojo);

View File

@ -2,6 +2,7 @@ package com.wugui.datax.admin.tool.datax.reader;
import com.google.common.collect.Maps;
import com.wugui.datax.admin.tool.pojo.DataxHbasePojo;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@ -25,11 +26,12 @@ public class HBaseReader extends BaseReaderPlugin implements DataxReaderInterfac
confige.put("hbase.zookeeper.quorum",plugin.getReaderHbaseConfig());
parameterObj.put("hbaseConfig", confige);
parameterObj.put("table", plugin.getReaderTable());
parameterObj.put("encoding", plugin.getReaderEncoding());
parameterObj.put("mode", plugin.getReaderMode());
parameterObj.put("column", plugin.getColumns());
parameterObj.put("range", plugin.getReaderRange());
parameterObj.put("maxVersion", plugin.getReadmaxVersion());
if(StringUtils.isNotBlank(plugin.getReaderRange().getStartRowkey()) && StringUtils.isNotBlank(plugin.getReaderRange().getEndRowkey())){
parameterObj.put("range", plugin.getReaderRange());
}
parameterObj.put("maxVersion", plugin.getReaderMaxVersion());
readerObj.put("parameter", parameterObj);
return readerObj;
}

View File

@ -2,36 +2,39 @@ package com.wugui.datax.admin.tool.datax.writer;
import com.google.common.collect.Maps;
import com.wugui.datax.admin.tool.pojo.DataxHbasePojo;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
public class HBaseWriter extends BaseWriterPlugin implements DataxWriterInterface {
@Override
public String getName() {
return "hbase11xwriter";
}
@Override
public String getName() {
return "hbase11xwriter";
}
@Override
public Map<String, Object> sample() {
return null;
}
@Override
public Map<String, Object> sample() {
return null;
}
public Map<String, Object> buildHbase(DataxHbasePojo plugin) {
//构建
Map<String, Object> readerObj = Maps.newLinkedHashMap();
readerObj.put("name", getName());
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
Map<String, Object> confige = Maps.newLinkedHashMap();
confige.put("hbase.zookeeper.quorum",plugin.getWriterHbaseConfig());
parameterObj.put("hbaseConfig", confige);
parameterObj.put("table", plugin.getWriterTable());
parameterObj.put("encoding", plugin.getWriterEncoding());
parameterObj.put("mode", plugin.getWriterMode());
parameterObj.put("column", plugin.getColumns());
parameterObj.put("range", plugin.getWriterRange());
parameterObj.put("rowkeyColumn", plugin.getWriterRowkeyColumn());
parameterObj.put("versionColumn",plugin.getWriterVersionColumn());
readerObj.put("parameter", parameterObj);
return readerObj;
}
public Map<String, Object> buildHbase(DataxHbasePojo plugin) {
//构建
Map<String, Object> readerObj = Maps.newLinkedHashMap();
readerObj.put("name", getName());
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
Map<String, Object> confige = Maps.newLinkedHashMap();
confige.put("hbase.zookeeper.quorum", plugin.getWriterHbaseConfig());
parameterObj.put("hbaseConfig", confige);
parameterObj.put("table", plugin.getWriterTable());
parameterObj.put("mode", plugin.getWriterMode());
parameterObj.put("column", plugin.getColumns());
if (StringUtils.isNotBlank(plugin.getWriterRowkeyColumn().getType())) {
parameterObj.put("rowkeyColumn", plugin.getWriterRowkeyColumn());
}
if (StringUtils.isNotBlank(plugin.getWriterVersionColumn().getValue())) {
parameterObj.put("versionColumn", plugin.getWriterVersionColumn());
}
readerObj.put("parameter", parameterObj);
return readerObj;
}
}

View File

@ -24,11 +24,9 @@ public class DataxHbasePojo {
private List<String> readerTable;
private String readerEncoding;
private String readerMode;
private String readmaxVersion;
private String readerMaxVersion;
private Range readerRange;
@ -36,12 +34,8 @@ public class DataxHbasePojo {
private List<String> writerTable;
private String writerEncoding;
private String writerMode;
private Range writerRange;
private VersionColumn writerVersionColumn;
private RowkeyColumn writerRowkeyColumn;