From 11ee79bfcc3d63ecd48066b0fad3e5d2802c2c7e Mon Sep 17 00:00:00 2001 From: zhouhongfa Date: Wed, 31 Jul 2019 15:13:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0mysqlreader=20=E5=92=8C=20mys?= =?UTF-8?q?qlwriter=E7=9A=84datax=20json=E6=9E=84=E5=BB=BA=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/wugui/tool/datax/BaseDataxPlugin.java | 22 +++++ .../com/wugui/tool/datax/DataxJsonHelper.java | 89 +++++++++++++++++-- .../wugui/tool/datax/DataxJsonInterface.java | 2 +- .../tool/datax/DataxPluginInterface.java | 13 ++- .../wugui/tool/datax/reader/MysqlReader.java | 48 +++++----- .../wugui/tool/datax/writer/MysqlWriter.java | 49 +++++----- .../com/wugui/tool/pojo/DataxPluginPojo.java | 34 +++++++ .../java/com/wugui/tool/util/JSONUtils.java | 25 ++++++ .../wugui/tool/datax/DataxJsonHelperTest.java | 72 +++++++++++++++ .../wugui/tool/query/MySQLQueryToolTest.java | 62 +++++++++++++ 10 files changed, 358 insertions(+), 58 deletions(-) create mode 100644 datax-web/src/main/java/com/wugui/tool/datax/BaseDataxPlugin.java create mode 100644 datax-web/src/main/java/com/wugui/tool/pojo/DataxPluginPojo.java create mode 100644 datax-web/src/main/java/com/wugui/tool/util/JSONUtils.java create mode 100644 datax-web/src/test/java/com/wugui/tool/datax/DataxJsonHelperTest.java create mode 100644 datax-web/src/test/java/com/wugui/tool/query/MySQLQueryToolTest.java diff --git a/datax-web/src/main/java/com/wugui/tool/datax/BaseDataxPlugin.java b/datax-web/src/main/java/com/wugui/tool/datax/BaseDataxPlugin.java new file mode 100644 index 00000000..063650d9 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/tool/datax/BaseDataxPlugin.java @@ -0,0 +1,22 @@ +package com.wugui.tool.datax; + +import java.util.Map; + +/** + * 抽象实现类 + * + * @author zhouhongfa@gz-yibo.com + * @ClassName BaseDataxPlugin + * @Version 1.0 + * @since 2019/7/31 9:45 + */ +public abstract class BaseDataxPlugin implements DataxPluginInterface { + + protected Map extraParams; + + + @Override + public void extraParams(Map extraParams) { + this.extraParams = extraParams; + } +} diff --git a/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonHelper.java b/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonHelper.java index 24f5f948..97fc45ae 100644 --- a/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonHelper.java +++ b/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonHelper.java @@ -1,6 +1,14 @@ package com.wugui.tool.datax; +import com.alibaba.druid.util.JdbcConstants; +import com.alibaba.druid.util.JdbcUtils; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.wugui.dataxweb.entity.JobJdbcDatasource; +import com.wugui.tool.datax.reader.MysqlReader; +import com.wugui.tool.datax.writer.MysqlWriter; +import com.wugui.tool.pojo.DataxPluginPojo; import java.util.List; import java.util.Map; @@ -22,6 +30,11 @@ public class DataxJsonHelper implements DataxJsonInterface { */ private List readerTables; + /** + * 读取的字段列表 + */ + private List readerColumns; + /** * reader jdbc 数据源 */ @@ -37,28 +50,88 @@ public class DataxJsonHelper implements DataxJsonInterface { */ private List writerTables; - @Override - public Map buildJob() { - return null; + /** + * 写入的字段列表 + */ + private List writerColumns; + + private BaseDataxPlugin readerPlugin; + + private BaseDataxPlugin writerPlugin; + + public void initReader(JobJdbcDatasource readerDatasource, List readerTables, List readerColumns) { + this.readerTables = readerTables; + this.readerColumns = readerColumns; + this.readerDatasource = readerDatasource; + // reader 插件 + String readerDbType = JdbcUtils.getDbType(readerDatasource.getJdbcUrl(), readerDatasource.getJdbcDriverClass()); + if (JdbcConstants.MYSQL.equals(readerDbType)) { + readerPlugin = new MysqlReader(); + } + } + + public void initWriter(JobJdbcDatasource writerDatasource, List writerTables, List writerColumns) { + this.writerDatasource = writerDatasource; + this.writerTables = writerTables; + this.writerColumns = writerColumns; + // writer + String writerDbType = JdbcUtils.getDbType(writerDatasource.getJdbcUrl(), writerDatasource.getJdbcDriverClass()); + if (JdbcConstants.MYSQL.equals(writerDbType)) { + writerPlugin = new MysqlWriter(); + } } @Override - public Map builSetting() { - return null; + public Map buildJob() { + + Map res = Maps.newLinkedHashMap(); + + Map jobMap = Maps.newLinkedHashMap(); + jobMap.put("setting", buildSetting()); + jobMap.put("content", ImmutableList.of(buildContent())); + + res.put("job", jobMap); + return res; + } + + @Override + public Map buildSetting() { + Map res = Maps.newLinkedHashMap(); + Map speedMap = Maps.newLinkedHashMap(); + Map errorLimitMap = Maps.newLinkedHashMap(); + speedMap.putAll(ImmutableMap.of("channel", 3)); + errorLimitMap.putAll(ImmutableMap.of("record", 0, "percentage", 0.02)); + res.put("speed", speedMap); + res.put("errorLimit", errorLimitMap); + return res; } @Override public Map buildContent() { - return null; + Map res = Maps.newLinkedHashMap(); + res.put("reader", buildReader()); + res.put("writer", buildWriter()); + return res; } @Override public Map buildReader() { - return null; + + DataxPluginPojo dataxPluginPojo = new DataxPluginPojo(); + dataxPluginPojo.setJdbcDatasource(readerDatasource); + dataxPluginPojo.setTables(readerTables); + dataxPluginPojo.setColumns(readerColumns); + + return readerPlugin.build(dataxPluginPojo); } @Override public Map buildWriter() { - return null; + DataxPluginPojo dataxPluginPojo = new DataxPluginPojo(); + dataxPluginPojo.setJdbcDatasource(writerDatasource); + dataxPluginPojo.setTables(writerTables); + dataxPluginPojo.setColumns(writerColumns); + + return writerPlugin.build(dataxPluginPojo); } } diff --git a/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonInterface.java b/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonInterface.java index 80aed657..e5eec971 100644 --- a/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonInterface.java +++ b/datax-web/src/main/java/com/wugui/tool/datax/DataxJsonInterface.java @@ -14,7 +14,7 @@ public interface DataxJsonInterface { Map buildJob(); - Map builSetting(); + Map buildSetting(); Map buildContent(); diff --git a/datax-web/src/main/java/com/wugui/tool/datax/DataxPluginInterface.java b/datax-web/src/main/java/com/wugui/tool/datax/DataxPluginInterface.java index 132b44dc..1e54fde4 100644 --- a/datax-web/src/main/java/com/wugui/tool/datax/DataxPluginInterface.java +++ b/datax-web/src/main/java/com/wugui/tool/datax/DataxPluginInterface.java @@ -1,5 +1,7 @@ package com.wugui.tool.datax; +import com.wugui.tool.pojo.DataxPluginPojo; + import java.util.Map; /** @@ -21,9 +23,9 @@ public interface DataxPluginInterface { /** * 构建 * - * @return + * @return dataxPluginPojo */ - Map build(); + Map build(DataxPluginPojo dataxPluginPojo); /** * 获取示例 @@ -31,4 +33,11 @@ public interface DataxPluginInterface { * @return */ Map sample(); + + /** + * 传递一些额外的参数 + * + * @return extraParams + */ + void extraParams(Map extraParams); } diff --git a/datax-web/src/main/java/com/wugui/tool/datax/reader/MysqlReader.java b/datax-web/src/main/java/com/wugui/tool/datax/reader/MysqlReader.java index e6e3aa62..d63dca2f 100644 --- a/datax-web/src/main/java/com/wugui/tool/datax/reader/MysqlReader.java +++ b/datax-web/src/main/java/com/wugui/tool/datax/reader/MysqlReader.java @@ -1,51 +1,51 @@ package com.wugui.tool.datax.reader; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import com.wugui.dataxweb.entity.JobJdbcDatasource; +import com.wugui.tool.datax.BaseDataxPlugin; +import com.wugui.tool.pojo.DataxPluginPojo; import java.util.Map; /** - * TODO + * mysql reader 构建类 * * @author zhouhongfa@gz-yibo.com * @ClassName MysqlReader * @Version 1.0 * @since 2019/7/30 23:07 */ -public class MysqlReader implements DataxReaderInterface { +public class MysqlReader extends BaseDataxPlugin implements DataxReaderInterface { @Override public String getName() { return "mysqlreader"; } @Override - public Map build() { - //获取表信息 -// TableInfo tableInfo = buildTableInfo(tableName); - + public Map build(DataxPluginPojo dataxPluginPojo) { //构建 Map readerObj = Maps.newLinkedHashMap(); -// readerObj.put("name", getReaderName()); + readerObj.put("name", getName()); // -// Map parameterObj = Maps.newLinkedHashMap(); -// parameterObj.put("username", jobJdbcDatasource.getJdbcUsername()); -// parameterObj.put("password", jobJdbcDatasource.getJdbcPassword()); + Map parameterObj = Maps.newLinkedHashMap(); + + JobJdbcDatasource jobJdbcDatasource = dataxPluginPojo.getJdbcDatasource(); + parameterObj.put("username", jobJdbcDatasource.getJdbcUsername()); + parameterObj.put("password", jobJdbcDatasource.getJdbcPassword()); + //列表 + parameterObj.put("column", dataxPluginPojo.getColumns()); // -// List columns = Lists.newArrayList(); -// //列表 -// tableInfo.getColumns().stream().forEach(e -> columns.add(e.getName())); -// parameterObj.put("column", columns); -// -// Map connectionObj = Maps.newLinkedHashMap(); -// connectionObj.put("table", ImmutableList.of(tableInfo.getName())); -// //where -//// connectionObj.put("where", "1=2"); -// connectionObj.put("jdbcUrl", ImmutableList.of(jobJdbcDatasource.getJdbcUrl())); -// -// parameterObj.put("connection", ImmutableList.of(connectionObj)); -// -// readerObj.put("parameter", parameterObj); + Map connectionObj = Maps.newLinkedHashMap(); + connectionObj.put("table", dataxPluginPojo.getTables()); + //where +// connectionObj.put("where", "1=2"); + connectionObj.put("jdbcUrl", ImmutableList.of(jobJdbcDatasource.getJdbcUrl())); + + parameterObj.put("connection", ImmutableList.of(connectionObj)); + + readerObj.put("parameter", parameterObj); return readerObj; } diff --git a/datax-web/src/main/java/com/wugui/tool/datax/writer/MysqlWriter.java b/datax-web/src/main/java/com/wugui/tool/datax/writer/MysqlWriter.java index 355082cf..35bcf385 100644 --- a/datax-web/src/main/java/com/wugui/tool/datax/writer/MysqlWriter.java +++ b/datax-web/src/main/java/com/wugui/tool/datax/writer/MysqlWriter.java @@ -1,25 +1,29 @@ package com.wugui.tool.datax.writer; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import com.wugui.dataxweb.entity.JobJdbcDatasource; +import com.wugui.tool.datax.BaseDataxPlugin; +import com.wugui.tool.pojo.DataxPluginPojo; import java.util.Map; /** - * TODO + * mysql writer构建类 * * @author zhouhongfa@gz-yibo.com * @ClassName MysqlWriter * @Version 1.0 * @since 2019/7/30 23:08 */ -public class MysqlWriter implements DataxWriterInterface { +public class MysqlWriter extends BaseDataxPlugin implements DataxWriterInterface { @Override public String getName() { return "mysqlwriter"; } @Override - public Map build() { + public Map build(DataxPluginPojo dataxPluginPojo) { //获取表信息 // TableInfo tableInfo = buildTableInfo(tableName); @@ -27,26 +31,25 @@ public class MysqlWriter implements DataxWriterInterface { // Map res = Maps.newLinkedHashMap(); Map writerObj = Maps.newLinkedHashMap(); // -// writerObj.put("name", getWriterName()); -// -// Map parameterObj = Maps.newLinkedHashMap(); -// parameterObj.put("writeMode", "insert"); -// parameterObj.put("username", jobJdbcDatasource.getJdbcUsername()); -// parameterObj.put("password", jobJdbcDatasource.getJdbcPassword()); -// -// List columns = Lists.newArrayList(); -// //列表 -// tableInfo.getColumns().stream().forEach(e -> columns.add(e.getName())); -// parameterObj.put("column", columns); -// -// Map connectionObj = Maps.newLinkedHashMap(); -// connectionObj.put("table", ImmutableList.of(tableInfo.getName())); -// -// connectionObj.put("jdbcUrl", jobJdbcDatasource.getJdbcUrl()); -// -// parameterObj.put("connection", ImmutableList.of(connectionObj)); -// -// writerObj.put("parameter", parameterObj); + writerObj.put("name", getName()); + + Map parameterObj = Maps.newLinkedHashMap(); + parameterObj.put("writeMode", "insert"); + + JobJdbcDatasource jobJdbcDatasource = dataxPluginPojo.getJdbcDatasource(); + parameterObj.put("username", jobJdbcDatasource.getJdbcUsername()); + parameterObj.put("password", jobJdbcDatasource.getJdbcPassword()); + + parameterObj.put("column", dataxPluginPojo.getColumns()); + + Map connectionObj = Maps.newLinkedHashMap(); + connectionObj.put("table", dataxPluginPojo.getTables()); + + connectionObj.put("jdbcUrl", jobJdbcDatasource.getJdbcUrl()); + + parameterObj.put("connection", ImmutableList.of(connectionObj)); + + writerObj.put("parameter", parameterObj); return writerObj; } diff --git a/datax-web/src/main/java/com/wugui/tool/pojo/DataxPluginPojo.java b/datax-web/src/main/java/com/wugui/tool/pojo/DataxPluginPojo.java new file mode 100644 index 00000000..c1d2f81f --- /dev/null +++ b/datax-web/src/main/java/com/wugui/tool/pojo/DataxPluginPojo.java @@ -0,0 +1,34 @@ +package com.wugui.tool.pojo; + +import com.wugui.dataxweb.entity.JobJdbcDatasource; +import lombok.Data; + +import java.util.List; + +/** + * 用于传参,构建json + * + * @author zhouhongfa@gz-yibo.com + * @ClassName DataxPluginPojo + * @Version 1.0 + * @since 2019/7/31 9:26 + */ +@Data +public class DataxPluginPojo { + + /** + * 表名 + */ + private List tables; + + /** + * 列名 + */ + private List columns; + + /** + * 数据源信息 + */ + private JobJdbcDatasource jdbcDatasource; + +} diff --git a/datax-web/src/main/java/com/wugui/tool/util/JSONUtils.java b/datax-web/src/main/java/com/wugui/tool/util/JSONUtils.java new file mode 100644 index 00000000..2363e6d8 --- /dev/null +++ b/datax-web/src/main/java/com/wugui/tool/util/JSONUtils.java @@ -0,0 +1,25 @@ +package com.wugui.tool.util; + +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSON; + +/** + * TODO + * + * @author zhouhongfa@gz-yibo.com + * @ClassName JSONUtils + * @Version 1.0 + * @since 2019/7/31 14:54 + */ +public class JSONUtils { + + /** + * 返回格式化的json + * + * @param object + * @return + */ + public static String formatJson(Object object) { + return JSONUtil.formatJsonStr(JSON.toJSONString(object)); + } +} diff --git a/datax-web/src/test/java/com/wugui/tool/datax/DataxJsonHelperTest.java b/datax-web/src/test/java/com/wugui/tool/datax/DataxJsonHelperTest.java new file mode 100644 index 00000000..0348509f --- /dev/null +++ b/datax-web/src/test/java/com/wugui/tool/datax/DataxJsonHelperTest.java @@ -0,0 +1,72 @@ +package com.wugui.tool.datax; + +import com.google.common.collect.ImmutableList; +import com.wugui.dataxweb.entity.JobJdbcDatasource; +import com.wugui.tool.util.JSONUtils; +import org.junit.Test; + +import java.util.Map; + +public class DataxJsonHelperTest { + + private JobJdbcDatasource getReaderDatasource() { + JobJdbcDatasource readerDatasource = new JobJdbcDatasource(); + readerDatasource.setDatasourceName("z01_mysql_3306"); + readerDatasource.setJdbcUsername("root"); + readerDatasource.setJdbcPassword("root"); + readerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8"); + readerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver"); + return readerDatasource; + } + + private JobJdbcDatasource getWriterDatasource() { + JobJdbcDatasource writerDatasource = new JobJdbcDatasource(); + writerDatasource.setDatasourceName("z01_mysql_3306"); + writerDatasource.setJdbcUsername("root"); + writerDatasource.setJdbcPassword("root"); + writerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web_demo?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8"); + writerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver"); + return writerDatasource; + } + + @Test + public void buildJob() { + DataxJsonHelper dataxJsonHelper = new DataxJsonHelper(); + dataxJsonHelper.initReader(getReaderDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id")); + dataxJsonHelper.initWriter(getWriterDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id")); + Map map = dataxJsonHelper.buildJob(); + System.out.println(JSONUtils.formatJson(map)); + } + + @Test + public void builSetting() { + DataxJsonHelper dataxJsonHelper = new DataxJsonHelper(); + Map map = dataxJsonHelper.buildSetting(); + System.out.println(JSONUtils.formatJson(map)); + } + + @Test + public void buildContent() { + DataxJsonHelper dataxJsonHelper = new DataxJsonHelper(); + dataxJsonHelper.initReader(getReaderDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id")); + dataxJsonHelper.initWriter(getWriterDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id")); + Map map = dataxJsonHelper.buildContent(); + System.out.println(JSONUtils.formatJson(map)); + } + + @Test + public void buildReader() { + DataxJsonHelper dataxJsonHelper = new DataxJsonHelper(); + dataxJsonHelper.initReader(getReaderDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id")); + Map reader = dataxJsonHelper.buildReader(); + System.out.println(JSONUtils.formatJson(reader)); + } + + @Test + public void buildWriter() { + DataxJsonHelper dataxJsonHelper = new DataxJsonHelper(); + dataxJsonHelper.initWriter(getWriterDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id")); + Map writer = dataxJsonHelper.buildWriter(); + System.out.println(JSONUtils.formatJson(writer)); + } +} \ No newline at end of file diff --git a/datax-web/src/test/java/com/wugui/tool/query/MySQLQueryToolTest.java b/datax-web/src/test/java/com/wugui/tool/query/MySQLQueryToolTest.java new file mode 100644 index 00000000..ac26d831 --- /dev/null +++ b/datax-web/src/test/java/com/wugui/tool/query/MySQLQueryToolTest.java @@ -0,0 +1,62 @@ +package com.wugui.tool.query; + +import com.wugui.dataxweb.entity.JobJdbcDatasource; +import com.wugui.tool.database.TableInfo; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +@Slf4j +public class MySQLQueryToolTest { + + private BaseQueryTool readQueryTool; + private BaseQueryTool writerQueryTool; + private JobJdbcDatasource readerDatasource; + private JobJdbcDatasource writerDatasource; + + @Before + public void before() { + genMysqlDemo(); + readQueryTool = QueryToolFactory.getByDbType(readerDatasource); + writerQueryTool = QueryToolFactory.getByDbType(writerDatasource); + } + + private void genMysqlDemo() { + readerDatasource = new JobJdbcDatasource(); + readerDatasource.setDatasourceName("z01_mysql_3306"); + readerDatasource.setJdbcUsername("root"); + readerDatasource.setJdbcPassword("root"); + readerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8"); + readerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver"); + + writerDatasource = new JobJdbcDatasource(); + writerDatasource.setDatasourceName("z01_mysql_3306"); + writerDatasource.setJdbcUsername("root"); + writerDatasource.setJdbcPassword("root"); + writerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web_demo?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8"); + writerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver"); + } + + @Test + public void getTableInfo() { + List> tableInfo = readQueryTool.getTableInfo("datax_plugin"); + tableInfo.forEach(e -> { + log.info(e.toString()); + }); + } + + @Test + public void buildTableInfo() { + TableInfo tableInfo = readQueryTool.buildTableInfo("datax_plugin"); + log.info(tableInfo.toString()); + } + + @Test + public void getTables() { + List> tables = readQueryTool.getTables(); + tables.forEach(e -> log.info(e.toString())); + } +} \ No newline at end of file