From 3ecfb431a94fd4e435d94ff9c7ffe17b9b6041c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=AF=E6=96=87=E5=87=AF?= Date: Wed, 4 Mar 2020 09:36:19 +0800 Subject: [PATCH] =?UTF-8?q?hbase=20json=E6=9E=84=E5=BB=BA=E5=AE=8C?= =?UTF-8?q?=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/JobDatasourceController.java | 3 +- .../wugui/datax/admin/dto/HbaseReaderDto.java | 7 +-- .../wugui/datax/admin/dto/HbaseWriterDto.java | 9 +-- .../java/com/wugui/datax/admin/dto/Range.java | 5 +- .../wugui/datax/admin/dto/RowkeyColumn.java | 3 + .../wugui/datax/admin/dto/VersionColumn.java | 2 + .../admin/service/JobDatasourceService.java | 4 +- .../impl/JobDatasourceServiceImpl.java | 11 +++- .../admin/tool/datax/DataxJsonHelper.java | 45 +++++++-------- .../admin/tool/datax/reader/HBaseReader.java | 8 ++- .../admin/tool/datax/writer/HBaseWriter.java | 55 ++++++++++--------- .../datax/admin/tool/pojo/DataxHbasePojo.java | 8 +-- 12 files changed, 82 insertions(+), 78 deletions(-) diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/controller/JobDatasourceController.java b/datax-admin/src/main/java/com/wugui/datax/admin/controller/JobDatasourceController.java index f763ec88..15f0c8a0 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/controller/JobDatasourceController.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/controller/JobDatasourceController.java @@ -16,6 +16,7 @@ import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.Map; @@ -155,7 +156,7 @@ public class JobDatasourceController extends ApiController { */ @PostMapping("/test") @ApiOperation("测试数据") - public R dataSourceTest (@RequestBody JobDatasource jobJdbcDatasource) { + public R dataSourceTest (@RequestBody JobDatasource jobJdbcDatasource) throws IOException { return success(jobJdbcDatasourceService.dataSourceTest(jobJdbcDatasource)); } } \ No newline at end of file diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseReaderDto.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseReaderDto.java index 61bb49fa..7a56c470 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseReaderDto.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseReaderDto.java @@ -3,16 +3,11 @@ package com.wugui.datax.admin.dto; import lombok.Data; import java.io.Serializable; -import java.util.List; @Data public class HbaseReaderDto implements Serializable { - private String readerHbaseConfig; - - private List readerTable; - - private String readerEncoding; + private String readerMaxVersion; private String readerMode; diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseWriterDto.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseWriterDto.java index fb45596b..ef14d3de 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseWriterDto.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/HbaseWriterDto.java @@ -3,20 +3,15 @@ package com.wugui.datax.admin.dto; import lombok.Data; import java.io.Serializable; -import java.util.List; @Data public class HbaseWriterDto implements Serializable { - private String writerHbaseConfig; - - private List writerTable; - - private String writerEncoding; + private String writeNullMode; private String writerMode; private RowkeyColumn writerRowkeyColumn; - private VersionColumn writervVersionColumn; + private VersionColumn writerVersionColumn; } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/Range.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/Range.java index 315b5183..e93d8d91 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/dto/Range.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/Range.java @@ -6,7 +6,10 @@ import java.io.Serializable; @Data public class Range implements Serializable { + private String startRowkey; + private String endRowkey; - private boolean isBinaryRowkey=true; + + private boolean isBinaryRowkey; } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/RowkeyColumn.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/RowkeyColumn.java index 665689ba..b5fc0630 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/dto/RowkeyColumn.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/RowkeyColumn.java @@ -6,7 +6,10 @@ import java.io.Serializable; @Data public class RowkeyColumn implements Serializable { + private Integer index; + private String type; + private String value; } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/dto/VersionColumn.java b/datax-admin/src/main/java/com/wugui/datax/admin/dto/VersionColumn.java index 52fec043..8efc973b 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/dto/VersionColumn.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/dto/VersionColumn.java @@ -4,6 +4,8 @@ import lombok.Data; @Data public class VersionColumn { + private Integer index; + private String value; } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/service/JobDatasourceService.java b/datax-admin/src/main/java/com/wugui/datax/admin/service/JobDatasourceService.java index 186ed1ec..32c56677 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/service/JobDatasourceService.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/service/JobDatasourceService.java @@ -3,6 +3,8 @@ package com.wugui.datax.admin.service; import com.baomidou.mybatisplus.extension.service.IService; import com.wugui.datax.admin.entity.JobDatasource; +import java.io.IOException; + /** * jdbc数据源配置表服务接口 * @@ -16,5 +18,5 @@ public interface JobDatasourceService extends IService { * @param jdbcDatasource * @return */ - Boolean dataSourceTest(JobDatasource jdbcDatasource); + Boolean dataSourceTest(JobDatasource jdbcDatasource) throws IOException; } \ No newline at end of file diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java b/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java index 248ec7fe..6d0cc89b 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/service/impl/JobDatasourceServiceImpl.java @@ -5,10 +5,14 @@ import com.wugui.datax.admin.mapper.JobJdbcDatasourceMapper; import com.wugui.datax.admin.entity.JobDatasource; import com.wugui.datax.admin.service.JobDatasourceService; import com.wugui.datax.admin.tool.query.BaseQueryTool; +import com.wugui.datax.admin.tool.query.HBaseQueryTool; import com.wugui.datax.admin.tool.query.QueryToolFactory; +import com.wugui.datax.admin.util.DataSourceConstants; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.io.IOException; + /** * jdbc数据源配置表服务实现类 * @@ -21,8 +25,11 @@ import org.springframework.transaction.annotation.Transactional; public class JobDatasourceServiceImpl extends ServiceImpl implements JobDatasourceService { @Override - public Boolean dataSourceTest(JobDatasource jdbcDatasource) { - BaseQueryTool queryTool = QueryToolFactory.getByDbType(jdbcDatasource); + public Boolean dataSourceTest(JobDatasource jobDatasource) throws IOException { + if (DataSourceConstants.HBASE.equals(jobDatasource.getDatasource())) { + return HBaseQueryTool.getInstance(jobDatasource).getFamily(); + } + BaseQueryTool queryTool = QueryToolFactory.getByDbType(jobDatasource); return queryTool.dataSourceTest(); } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonHelper.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonHelper.java index 94d855ad..3ea5456e 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonHelper.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/DataxJsonHelper.java @@ -212,6 +212,24 @@ public class DataxJsonHelper implements DataxJsonInterface { return readerPlugin.buildHive(dataxHivePojo); } + @Override + public Map buildHBaseReader() { + DataxHbasePojo dataxHbasePojo = new DataxHbasePojo(); + dataxHbasePojo.setJdbcDatasource(readerDatasource); + List> columns = Lists.newArrayList(); + for (int i = 0; i < readerColumns.size(); i++) { + Map column = Maps.newLinkedHashMap(); + column.put("name", readerColumns.get(i)); + column.put("type", "string"); + columns.add(column); + } + dataxHbasePojo.setColumns(columns); + dataxHbasePojo.setReaderHbaseConfig(readerDatasource.getZkAdress()); + dataxHbasePojo.setReaderTable(readerTables); + dataxHbasePojo.setReaderMode(hbaseReaderDto.getReaderMode()); + dataxHbasePojo.setReaderRange(hbaseReaderDto.getReaderRange()); + return readerPlugin.buildHbase(dataxHbasePojo); + } @Override public Map buildWriter() { DataxRdbmsPojo dataxPluginPojo = new DataxRdbmsPojo(); @@ -243,26 +261,6 @@ public class DataxJsonHelper implements DataxJsonInterface { return writerPlugin.buildHive(dataxHivePojo); } - @Override - public Map buildHBaseReader() { - DataxHbasePojo dataxHbasePojo = new DataxHbasePojo(); - dataxHbasePojo.setJdbcDatasource(readerDatasource); - List> columns = Lists.newArrayList(); - for (int i = 0; i < readerColumns.size(); i++) { - Map column = Maps.newLinkedHashMap(); - column.put("name", readerColumns.get(i)); - column.put("type", "string"); - columns.add(column); - } - dataxHbasePojo.setColumns(columns); - dataxHbasePojo.setReaderHbaseConfig(hbaseReaderDto.getReaderHbaseConfig()); - dataxHbasePojo.setReaderTable(hbaseReaderDto.getReaderTable()); - dataxHbasePojo.setReaderEncoding(hbaseReaderDto.getReaderEncoding()); - dataxHbasePojo.setReaderMode(hbaseReaderDto.getReaderMode()); - dataxHbasePojo.setReaderRange(hbaseReaderDto.getReaderRange()); - return readerPlugin.buildHbase(dataxHbasePojo); - } - @Override public Map buildHBaseWriter() { DataxHbasePojo dataxHbasePojo = new DataxHbasePojo(); @@ -276,10 +274,9 @@ public class DataxJsonHelper implements DataxJsonInterface { columns.add(column); } dataxHbasePojo.setColumns(columns); - dataxHbasePojo.setWriterHbaseConfig(hbaseWriterDto.getWriterHbaseConfig()); - dataxHbasePojo.setWriterTable(hbaseWriterDto.getWriterTable()); - dataxHbasePojo.setWriterEncoding(hbaseWriterDto.getWriterEncoding()); - dataxHbasePojo.setWriterVersionColumn(hbaseWriterDto.getWritervVersionColumn()); + dataxHbasePojo.setWriterHbaseConfig(writerDatasource.getZkAdress()); + dataxHbasePojo.setWriterTable(readerTables); + dataxHbasePojo.setWriterVersionColumn(hbaseWriterDto.getWriterVersionColumn()); dataxHbasePojo.setWriterRowkeyColumn(hbaseWriterDto.getWriterRowkeyColumn()); dataxHbasePojo.setWriterMode(hbaseWriterDto.getWriterMode()); return writerPlugin.buildHbase(dataxHbasePojo); diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/HBaseReader.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/HBaseReader.java index ddade0df..fd5848b7 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/HBaseReader.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/HBaseReader.java @@ -2,6 +2,7 @@ package com.wugui.datax.admin.tool.datax.reader; import com.google.common.collect.Maps; import com.wugui.datax.admin.tool.pojo.DataxHbasePojo; +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -25,11 +26,12 @@ public class HBaseReader extends BaseReaderPlugin implements DataxReaderInterfac confige.put("hbase.zookeeper.quorum",plugin.getReaderHbaseConfig()); parameterObj.put("hbaseConfig", confige); parameterObj.put("table", plugin.getReaderTable()); - parameterObj.put("encoding", plugin.getReaderEncoding()); parameterObj.put("mode", plugin.getReaderMode()); parameterObj.put("column", plugin.getColumns()); - parameterObj.put("range", plugin.getReaderRange()); - parameterObj.put("maxVersion", plugin.getReadmaxVersion()); + if(StringUtils.isNotBlank(plugin.getReaderRange().getStartRowkey()) && StringUtils.isNotBlank(plugin.getReaderRange().getEndRowkey())){ + parameterObj.put("range", plugin.getReaderRange()); + } + parameterObj.put("maxVersion", plugin.getReaderMaxVersion()); readerObj.put("parameter", parameterObj); return readerObj; } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java index c26e0825..0ab28a30 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/HBaseWriter.java @@ -2,36 +2,39 @@ package com.wugui.datax.admin.tool.datax.writer; import com.google.common.collect.Maps; import com.wugui.datax.admin.tool.pojo.DataxHbasePojo; +import org.apache.commons.lang3.StringUtils; import java.util.Map; public class HBaseWriter extends BaseWriterPlugin implements DataxWriterInterface { - @Override - public String getName() { - return "hbase11xwriter"; - } + @Override + public String getName() { + return "hbase11xwriter"; + } - @Override - public Map sample() { - return null; - } + @Override + public Map sample() { + return null; + } - public Map buildHbase(DataxHbasePojo plugin) { - //构建 - Map readerObj = Maps.newLinkedHashMap(); - readerObj.put("name", getName()); - Map parameterObj = Maps.newLinkedHashMap(); - Map confige = Maps.newLinkedHashMap(); - confige.put("hbase.zookeeper.quorum",plugin.getWriterHbaseConfig()); - parameterObj.put("hbaseConfig", confige); - parameterObj.put("table", plugin.getWriterTable()); - parameterObj.put("encoding", plugin.getWriterEncoding()); - parameterObj.put("mode", plugin.getWriterMode()); - parameterObj.put("column", plugin.getColumns()); - parameterObj.put("range", plugin.getWriterRange()); - parameterObj.put("rowkeyColumn", plugin.getWriterRowkeyColumn()); - parameterObj.put("versionColumn",plugin.getWriterVersionColumn()); - readerObj.put("parameter", parameterObj); - return readerObj; - } + public Map buildHbase(DataxHbasePojo plugin) { + //构建 + Map readerObj = Maps.newLinkedHashMap(); + readerObj.put("name", getName()); + Map parameterObj = Maps.newLinkedHashMap(); + Map confige = Maps.newLinkedHashMap(); + confige.put("hbase.zookeeper.quorum", plugin.getWriterHbaseConfig()); + parameterObj.put("hbaseConfig", confige); + parameterObj.put("table", plugin.getWriterTable()); + parameterObj.put("mode", plugin.getWriterMode()); + parameterObj.put("column", plugin.getColumns()); + if (StringUtils.isNotBlank(plugin.getWriterRowkeyColumn().getType())) { + parameterObj.put("rowkeyColumn", plugin.getWriterRowkeyColumn()); + } + if (StringUtils.isNotBlank(plugin.getWriterVersionColumn().getValue())) { + parameterObj.put("versionColumn", plugin.getWriterVersionColumn()); + } + readerObj.put("parameter", parameterObj); + return readerObj; + } } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxHbasePojo.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxHbasePojo.java index e0373b2a..65a70fea 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxHbasePojo.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/pojo/DataxHbasePojo.java @@ -24,11 +24,9 @@ public class DataxHbasePojo { private List readerTable; - private String readerEncoding; - private String readerMode; - private String readmaxVersion; + private String readerMaxVersion; private Range readerRange; @@ -36,12 +34,8 @@ public class DataxHbasePojo { private List writerTable; - private String writerEncoding; - private String writerMode; - private Range writerRange; - private VersionColumn writerVersionColumn; private RowkeyColumn writerRowkeyColumn;