实现mysqlreader 和 mysqlwriter的datax json构建;

This commit is contained in:
zhouhongfa 2019-07-31 15:13:54 +08:00
parent 899f455815
commit 11ee79bfcc
10 changed files with 358 additions and 58 deletions

View File

@ -0,0 +1,22 @@
package com.wugui.tool.datax;
import java.util.Map;
/**
* 抽象实现类
*
* @author zhouhongfa@gz-yibo.com
* @ClassName BaseDataxPlugin
* @Version 1.0
* @since 2019/7/31 9:45
*/
public abstract class BaseDataxPlugin implements DataxPluginInterface {
protected Map<String, Object> extraParams;
@Override
public void extraParams(Map<String, Object> extraParams) {
this.extraParams = extraParams;
}
}

View File

@ -1,6 +1,14 @@
package com.wugui.tool.datax;
import com.alibaba.druid.util.JdbcConstants;
import com.alibaba.druid.util.JdbcUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.wugui.dataxweb.entity.JobJdbcDatasource;
import com.wugui.tool.datax.reader.MysqlReader;
import com.wugui.tool.datax.writer.MysqlWriter;
import com.wugui.tool.pojo.DataxPluginPojo;
import java.util.List;
import java.util.Map;
@ -22,6 +30,11 @@ public class DataxJsonHelper implements DataxJsonInterface {
*/
private List<String> readerTables;
/**
* 读取的字段列表
*/
private List<String> readerColumns;
/**
* reader jdbc 数据源
*/
@ -37,28 +50,88 @@ public class DataxJsonHelper implements DataxJsonInterface {
*/
private List<String> writerTables;
@Override
public Map<String, Object> buildJob() {
return null;
/**
* 写入的字段列表
*/
private List<String> writerColumns;
private BaseDataxPlugin readerPlugin;
private BaseDataxPlugin writerPlugin;
public void initReader(JobJdbcDatasource readerDatasource, List<String> readerTables, List<String> readerColumns) {
this.readerTables = readerTables;
this.readerColumns = readerColumns;
this.readerDatasource = readerDatasource;
// reader 插件
String readerDbType = JdbcUtils.getDbType(readerDatasource.getJdbcUrl(), readerDatasource.getJdbcDriverClass());
if (JdbcConstants.MYSQL.equals(readerDbType)) {
readerPlugin = new MysqlReader();
}
}
public void initWriter(JobJdbcDatasource writerDatasource, List<String> writerTables, List<String> writerColumns) {
this.writerDatasource = writerDatasource;
this.writerTables = writerTables;
this.writerColumns = writerColumns;
// writer
String writerDbType = JdbcUtils.getDbType(writerDatasource.getJdbcUrl(), writerDatasource.getJdbcDriverClass());
if (JdbcConstants.MYSQL.equals(writerDbType)) {
writerPlugin = new MysqlWriter();
}
}
@Override
public Map<String, Object> builSetting() {
return null;
public Map<String, Object> buildJob() {
Map<String, Object> res = Maps.newLinkedHashMap();
Map<String, Object> jobMap = Maps.newLinkedHashMap();
jobMap.put("setting", buildSetting());
jobMap.put("content", ImmutableList.of(buildContent()));
res.put("job", jobMap);
return res;
}
@Override
public Map<String, Object> buildSetting() {
Map<String, Object> res = Maps.newLinkedHashMap();
Map<String, Object> speedMap = Maps.newLinkedHashMap();
Map<String, Object> errorLimitMap = Maps.newLinkedHashMap();
speedMap.putAll(ImmutableMap.of("channel", 3));
errorLimitMap.putAll(ImmutableMap.of("record", 0, "percentage", 0.02));
res.put("speed", speedMap);
res.put("errorLimit", errorLimitMap);
return res;
}
@Override
public Map<String, Object> buildContent() {
return null;
Map<String, Object> res = Maps.newLinkedHashMap();
res.put("reader", buildReader());
res.put("writer", buildWriter());
return res;
}
@Override
public Map<String, Object> buildReader() {
return null;
DataxPluginPojo dataxPluginPojo = new DataxPluginPojo();
dataxPluginPojo.setJdbcDatasource(readerDatasource);
dataxPluginPojo.setTables(readerTables);
dataxPluginPojo.setColumns(readerColumns);
return readerPlugin.build(dataxPluginPojo);
}
@Override
public Map<String, Object> buildWriter() {
return null;
DataxPluginPojo dataxPluginPojo = new DataxPluginPojo();
dataxPluginPojo.setJdbcDatasource(writerDatasource);
dataxPluginPojo.setTables(writerTables);
dataxPluginPojo.setColumns(writerColumns);
return writerPlugin.build(dataxPluginPojo);
}
}

View File

@ -14,7 +14,7 @@ public interface DataxJsonInterface {
Map<String, Object> buildJob();
Map<String, Object> builSetting();
Map<String, Object> buildSetting();
Map<String, Object> buildContent();

View File

@ -1,5 +1,7 @@
package com.wugui.tool.datax;
import com.wugui.tool.pojo.DataxPluginPojo;
import java.util.Map;
/**
@ -21,9 +23,9 @@ public interface DataxPluginInterface {
/**
* 构建
*
* @return
* @return dataxPluginPojo
*/
Map<String, Object> build();
Map<String, Object> build(DataxPluginPojo dataxPluginPojo);
/**
* 获取示例
@ -31,4 +33,11 @@ public interface DataxPluginInterface {
* @return
*/
Map<String, Object> sample();
/**
* 传递一些额外的参数
*
* @return extraParams
*/
void extraParams(Map<String, Object> extraParams);
}

View File

@ -1,51 +1,51 @@
package com.wugui.tool.datax.reader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.wugui.dataxweb.entity.JobJdbcDatasource;
import com.wugui.tool.datax.BaseDataxPlugin;
import com.wugui.tool.pojo.DataxPluginPojo;
import java.util.Map;
/**
* TODO
* mysql reader 构建类
*
* @author zhouhongfa@gz-yibo.com
* @ClassName MysqlReader
* @Version 1.0
* @since 2019/7/30 23:07
*/
public class MysqlReader implements DataxReaderInterface {
public class MysqlReader extends BaseDataxPlugin implements DataxReaderInterface {
@Override
public String getName() {
return "mysqlreader";
}
@Override
public Map<String, Object> build() {
//获取表信息
// TableInfo tableInfo = buildTableInfo(tableName);
public Map<String, Object> build(DataxPluginPojo dataxPluginPojo) {
//构建
Map<String, Object> readerObj = Maps.newLinkedHashMap();
// readerObj.put("name", getReaderName());
readerObj.put("name", getName());
//
// Map<String, Object> parameterObj = Maps.newLinkedHashMap();
// parameterObj.put("username", jobJdbcDatasource.getJdbcUsername());
// parameterObj.put("password", jobJdbcDatasource.getJdbcPassword());
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
JobJdbcDatasource jobJdbcDatasource = dataxPluginPojo.getJdbcDatasource();
parameterObj.put("username", jobJdbcDatasource.getJdbcUsername());
parameterObj.put("password", jobJdbcDatasource.getJdbcPassword());
//列表
parameterObj.put("column", dataxPluginPojo.getColumns());
//
// List<String> columns = Lists.newArrayList();
// //列表
// tableInfo.getColumns().stream().forEach(e -> columns.add(e.getName()));
// parameterObj.put("column", columns);
//
// Map<String, Object> connectionObj = Maps.newLinkedHashMap();
// connectionObj.put("table", ImmutableList.of(tableInfo.getName()));
// //where
//// connectionObj.put("where", "1=2");
// connectionObj.put("jdbcUrl", ImmutableList.of(jobJdbcDatasource.getJdbcUrl()));
//
// parameterObj.put("connection", ImmutableList.of(connectionObj));
//
// readerObj.put("parameter", parameterObj);
Map<String, Object> connectionObj = Maps.newLinkedHashMap();
connectionObj.put("table", dataxPluginPojo.getTables());
//where
// connectionObj.put("where", "1=2");
connectionObj.put("jdbcUrl", ImmutableList.of(jobJdbcDatasource.getJdbcUrl()));
parameterObj.put("connection", ImmutableList.of(connectionObj));
readerObj.put("parameter", parameterObj);
return readerObj;
}

View File

@ -1,25 +1,29 @@
package com.wugui.tool.datax.writer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.wugui.dataxweb.entity.JobJdbcDatasource;
import com.wugui.tool.datax.BaseDataxPlugin;
import com.wugui.tool.pojo.DataxPluginPojo;
import java.util.Map;
/**
* TODO
* mysql writer构建类
*
* @author zhouhongfa@gz-yibo.com
* @ClassName MysqlWriter
* @Version 1.0
* @since 2019/7/30 23:08
*/
public class MysqlWriter implements DataxWriterInterface {
public class MysqlWriter extends BaseDataxPlugin implements DataxWriterInterface {
@Override
public String getName() {
return "mysqlwriter";
}
@Override
public Map<String, Object> build() {
public Map<String, Object> build(DataxPluginPojo dataxPluginPojo) {
//获取表信息
// TableInfo tableInfo = buildTableInfo(tableName);
@ -27,26 +31,25 @@ public class MysqlWriter implements DataxWriterInterface {
// Map<String, Object> res = Maps.newLinkedHashMap();
Map<String, Object> writerObj = Maps.newLinkedHashMap();
//
// writerObj.put("name", getWriterName());
//
// Map<String, Object> parameterObj = Maps.newLinkedHashMap();
// parameterObj.put("writeMode", "insert");
// parameterObj.put("username", jobJdbcDatasource.getJdbcUsername());
// parameterObj.put("password", jobJdbcDatasource.getJdbcPassword());
//
// List<String> columns = Lists.newArrayList();
// //列表
// tableInfo.getColumns().stream().forEach(e -> columns.add(e.getName()));
// parameterObj.put("column", columns);
//
// Map<String, Object> connectionObj = Maps.newLinkedHashMap();
// connectionObj.put("table", ImmutableList.of(tableInfo.getName()));
//
// connectionObj.put("jdbcUrl", jobJdbcDatasource.getJdbcUrl());
//
// parameterObj.put("connection", ImmutableList.of(connectionObj));
//
// writerObj.put("parameter", parameterObj);
writerObj.put("name", getName());
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
parameterObj.put("writeMode", "insert");
JobJdbcDatasource jobJdbcDatasource = dataxPluginPojo.getJdbcDatasource();
parameterObj.put("username", jobJdbcDatasource.getJdbcUsername());
parameterObj.put("password", jobJdbcDatasource.getJdbcPassword());
parameterObj.put("column", dataxPluginPojo.getColumns());
Map<String, Object> connectionObj = Maps.newLinkedHashMap();
connectionObj.put("table", dataxPluginPojo.getTables());
connectionObj.put("jdbcUrl", jobJdbcDatasource.getJdbcUrl());
parameterObj.put("connection", ImmutableList.of(connectionObj));
writerObj.put("parameter", parameterObj);
return writerObj;
}

View File

@ -0,0 +1,34 @@
package com.wugui.tool.pojo;
import com.wugui.dataxweb.entity.JobJdbcDatasource;
import lombok.Data;
import java.util.List;
/**
* 用于传参构建json
*
* @author zhouhongfa@gz-yibo.com
* @ClassName DataxPluginPojo
* @Version 1.0
* @since 2019/7/31 9:26
*/
@Data
public class DataxPluginPojo {
/**
* 表名
*/
private List<String> tables;
/**
* 列名
*/
private List<String> columns;
/**
* 数据源信息
*/
private JobJdbcDatasource jdbcDatasource;
}

View File

@ -0,0 +1,25 @@
package com.wugui.tool.util;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
/**
* TODO
*
* @author zhouhongfa@gz-yibo.com
* @ClassName JSONUtils
* @Version 1.0
* @since 2019/7/31 14:54
*/
public class JSONUtils {
/**
* 返回格式化的json
*
* @param object
* @return
*/
public static String formatJson(Object object) {
return JSONUtil.formatJsonStr(JSON.toJSONString(object));
}
}

View File

@ -0,0 +1,72 @@
package com.wugui.tool.datax;
import com.google.common.collect.ImmutableList;
import com.wugui.dataxweb.entity.JobJdbcDatasource;
import com.wugui.tool.util.JSONUtils;
import org.junit.Test;
import java.util.Map;
public class DataxJsonHelperTest {
private JobJdbcDatasource getReaderDatasource() {
JobJdbcDatasource readerDatasource = new JobJdbcDatasource();
readerDatasource.setDatasourceName("z01_mysql_3306");
readerDatasource.setJdbcUsername("root");
readerDatasource.setJdbcPassword("root");
readerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8");
readerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver");
return readerDatasource;
}
private JobJdbcDatasource getWriterDatasource() {
JobJdbcDatasource writerDatasource = new JobJdbcDatasource();
writerDatasource.setDatasourceName("z01_mysql_3306");
writerDatasource.setJdbcUsername("root");
writerDatasource.setJdbcPassword("root");
writerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web_demo?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8");
writerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver");
return writerDatasource;
}
@Test
public void buildJob() {
DataxJsonHelper dataxJsonHelper = new DataxJsonHelper();
dataxJsonHelper.initReader(getReaderDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id"));
dataxJsonHelper.initWriter(getWriterDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id"));
Map<String, Object> map = dataxJsonHelper.buildJob();
System.out.println(JSONUtils.formatJson(map));
}
@Test
public void builSetting() {
DataxJsonHelper dataxJsonHelper = new DataxJsonHelper();
Map<String, Object> map = dataxJsonHelper.buildSetting();
System.out.println(JSONUtils.formatJson(map));
}
@Test
public void buildContent() {
DataxJsonHelper dataxJsonHelper = new DataxJsonHelper();
dataxJsonHelper.initReader(getReaderDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id"));
dataxJsonHelper.initWriter(getWriterDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id"));
Map<String, Object> map = dataxJsonHelper.buildContent();
System.out.println(JSONUtils.formatJson(map));
}
@Test
public void buildReader() {
DataxJsonHelper dataxJsonHelper = new DataxJsonHelper();
dataxJsonHelper.initReader(getReaderDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id"));
Map<String, Object> reader = dataxJsonHelper.buildReader();
System.out.println(JSONUtils.formatJson(reader));
}
@Test
public void buildWriter() {
DataxJsonHelper dataxJsonHelper = new DataxJsonHelper();
dataxJsonHelper.initWriter(getWriterDatasource(), ImmutableList.of("datax_plugin"), ImmutableList.of("id"));
Map<String, Object> writer = dataxJsonHelper.buildWriter();
System.out.println(JSONUtils.formatJson(writer));
}
}

View File

@ -0,0 +1,62 @@
package com.wugui.tool.query;
import com.wugui.dataxweb.entity.JobJdbcDatasource;
import com.wugui.tool.database.TableInfo;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
@Slf4j
public class MySQLQueryToolTest {
private BaseQueryTool readQueryTool;
private BaseQueryTool writerQueryTool;
private JobJdbcDatasource readerDatasource;
private JobJdbcDatasource writerDatasource;
@Before
public void before() {
genMysqlDemo();
readQueryTool = QueryToolFactory.getByDbType(readerDatasource);
writerQueryTool = QueryToolFactory.getByDbType(writerDatasource);
}
private void genMysqlDemo() {
readerDatasource = new JobJdbcDatasource();
readerDatasource.setDatasourceName("z01_mysql_3306");
readerDatasource.setJdbcUsername("root");
readerDatasource.setJdbcPassword("root");
readerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8");
readerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver");
writerDatasource = new JobJdbcDatasource();
writerDatasource.setDatasourceName("z01_mysql_3306");
writerDatasource.setJdbcUsername("root");
writerDatasource.setJdbcPassword("root");
writerDatasource.setJdbcUrl("jdbc:mysql://z01:3306/datax_web_demo?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8");
writerDatasource.setJdbcDriverClass("com.mysql.jdbc.Driver");
}
@Test
public void getTableInfo() {
List<Map<String, Object>> tableInfo = readQueryTool.getTableInfo("datax_plugin");
tableInfo.forEach(e -> {
log.info(e.toString());
});
}
@Test
public void buildTableInfo() {
TableInfo tableInfo = readQueryTool.buildTableInfo("datax_plugin");
log.info(tableInfo.toString());
}
@Test
public void getTables() {
List<Map<String, Object>> tables = readQueryTool.getTables();
tables.forEach(e -> log.info(e.toString()));
}
}