diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/config/ControllerAspect.java b/datax-admin/src/main/java/com/wugui/datax/admin/config/ControllerAspect.java index 17a49fb9..15502a59 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/config/ControllerAspect.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/config/ControllerAspect.java @@ -38,7 +38,9 @@ public class ControllerAspect { String method = signature.getDeclaringTypeName().replace(clsPath, "") + "." + signature.getName(); long start = System.currentTimeMillis(); String input = Arrays.toString(joinPoint.getArgs()); - log.info("controller:{},input:{}", method, input); + if (!method.contains("JobApiController")) { + log.info("controller:{},input:{}", method, input); + } try { Object resObj = joinPoint.proceed(); return resObj; diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisPlusConfig.java b/datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisPlusConfig.java index ae0ff557..eecb753b 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisPlusConfig.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisPlusConfig.java @@ -1,7 +1,7 @@ package com.wugui.datax.admin.config; +import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector; import com.baomidou.mybatisplus.core.injector.ISqlInjector; -import com.baomidou.mybatisplus.extension.injector.LogicSqlInjector; import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor; import org.mybatis.spring.annotation.MapperScan; import org.springframework.context.annotation.Bean; @@ -10,6 +10,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; /** * MybatisPlus配置类 Spring boot方式 + * * @author huzekang */ @EnableTransactionManagement @@ -29,11 +30,15 @@ public class MybatisPlusConfig { /** * MyBatisPlus逻辑删除 ,需要在 yml 中配置开启 + * 3.0.7.1版本的LogicSqlInjector里面什么都没做只是 extends DefaultSqlInjector + * 以后版本直接去的了LogicSqlInjector * * @return */ @Bean public ISqlInjector sqlInjector() { - return new LogicSqlInjector(); + return new DefaultSqlInjector(); } + + } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/core/handler/AESEncryptHandler.java b/datax-admin/src/main/java/com/wugui/datax/admin/core/handler/AESEncryptHandler.java new file mode 100644 index 00000000..bec2ecbf --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/core/handler/AESEncryptHandler.java @@ -0,0 +1,44 @@ +package com.wugui.datax.admin.core.handler; + +import com.wugui.datax.admin.util.AESUtil; +import org.apache.ibatis.type.BaseTypeHandler; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedTypes; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * @author water + * @date 20-03-17 下午5:38 + */ +@MappedTypes({String.class}) +public class AESEncryptHandler extends BaseTypeHandler { + + + @Override + public void setNonNullParameter(PreparedStatement ps, int i, String parameter, JdbcType jdbcType) throws SQLException { + ps.setString(i, AESUtil.encrypt(parameter)); + } + + @Override + public String getNullableResult(ResultSet rs, String columnName) throws SQLException { + String columnValue = rs.getString(columnName); + return AESUtil.decrypt(columnValue); + } + + @Override + public String getNullableResult(ResultSet rs, int columnIndex) throws SQLException { + String columnValue = rs.getString(columnIndex); + return AESUtil.decrypt(columnValue); + } + + @Override + public String getNullableResult(CallableStatement cs, int columnIndex) + throws SQLException { + String columnValue = cs.getString(columnIndex); + return AESUtil.decrypt(columnValue); + } +} \ No newline at end of file diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisMetaObjectHandler.java b/datax-admin/src/main/java/com/wugui/datax/admin/core/handler/MybatisMetaObjectHandler.java similarity index 95% rename from datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisMetaObjectHandler.java rename to datax-admin/src/main/java/com/wugui/datax/admin/core/handler/MybatisMetaObjectHandler.java index 651abc90..68a7107a 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/config/MybatisMetaObjectHandler.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/core/handler/MybatisMetaObjectHandler.java @@ -1,4 +1,4 @@ -package com.wugui.datax.admin.config; +package com.wugui.datax.admin.core.handler; import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; import lombok.extern.slf4j.Slf4j; diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/entity/JobJdbcDatasource.java b/datax-admin/src/main/java/com/wugui/datax/admin/entity/JobJdbcDatasource.java index cf4932de..0503ca18 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/entity/JobJdbcDatasource.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/entity/JobJdbcDatasource.java @@ -3,6 +3,7 @@ package com.wugui.datax.admin.entity; import com.alibaba.fastjson.annotation.JSONField; import com.baomidou.mybatisplus.annotation.*; import com.baomidou.mybatisplus.extension.activerecord.Model; +import com.wugui.datax.admin.core.handler.AESEncryptHandler; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -50,13 +51,17 @@ public class JobJdbcDatasource extends Model { /** * 用户名 + * AESEncryptHandler 加密类 + * MyBatis Plus 3.0.7.1之前版本没有typeHandler属性,需要升级到最低3.1.2 */ @ApiModelProperty(value = "用户名") + @TableField(typeHandler = AESEncryptHandler.class) private String jdbcUsername; /** * 密码 */ + @TableField(typeHandler = AESEncryptHandler.class) @ApiModelProperty(value = "密码") private String jdbcPassword; diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java index bc2158b7..084acc41 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/reader/BaseReaderPlugin.java @@ -7,6 +7,7 @@ import com.wugui.datax.admin.entity.JobJdbcDatasource; import com.wugui.datax.admin.tool.datax.BaseDataxPlugin; import com.wugui.datax.admin.tool.pojo.DataxHivePojo; import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo; +import com.wugui.datax.admin.util.AESUtil; import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -26,15 +27,13 @@ public abstract class BaseReaderPlugin extends BaseDataxPlugin { public Map build(DataxRdbmsPojo plugin) { //构建 Map readerObj = Maps.newLinkedHashMap(); - readerObj.put("name", getName()); -// Map parameterObj = Maps.newLinkedHashMap(); Map connectionObj = Maps.newLinkedHashMap(); JobJdbcDatasource jobJdbcDatasource = plugin.getJdbcDatasource(); - parameterObj.put("username", jobJdbcDatasource.getJdbcUsername()); - parameterObj.put("password", jobJdbcDatasource.getJdbcPassword()); + parameterObj.put("username", AESUtil.decrypt(jobJdbcDatasource.getJdbcUsername())); + parameterObj.put("password", AESUtil.decrypt(jobJdbcDatasource.getJdbcPassword())); //判断是否是 querySql if (StrUtil.isNotBlank(plugin.getQuerySql())) { diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java index dc093242..84493673 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/datax/writer/BaseWriterPlugin.java @@ -6,6 +6,7 @@ import com.wugui.datax.admin.entity.JobJdbcDatasource; import com.wugui.datax.admin.tool.datax.BaseDataxPlugin; import com.wugui.datax.admin.tool.pojo.DataxHivePojo; import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo; +import com.wugui.datax.admin.util.AESUtil; import java.util.Map; @@ -26,8 +27,8 @@ public abstract class BaseWriterPlugin extends BaseDataxPlugin { Map parameterObj = Maps.newLinkedHashMap(); // parameterObj.put("writeMode", "insert"); JobJdbcDatasource jobJdbcDatasource = plugin.getJdbcDatasource(); - parameterObj.put("username", jobJdbcDatasource.getJdbcUsername()); - parameterObj.put("password", jobJdbcDatasource.getJdbcPassword()); + parameterObj.put("username", AESUtil.decrypt(jobJdbcDatasource.getJdbcUsername())); + parameterObj.put("password", AESUtil.decrypt(jobJdbcDatasource.getJdbcPassword())); parameterObj.put("column", plugin.getRdbmsColumns()); // preSql parameterObj.put("preSql", ImmutableList.of(plugin.getPreSql())); diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java index bae3f6bf..e8dfe62c 100644 --- a/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java +++ b/datax-admin/src/main/java/com/wugui/datax/admin/tool/query/BaseQueryTool.java @@ -13,6 +13,7 @@ import com.wugui.datax.admin.tool.database.DasColumn; import com.wugui.datax.admin.tool.database.TableInfo; import com.wugui.datax.admin.tool.meta.DatabaseInterface; import com.wugui.datax.admin.tool.meta.DatabaseMetaFactory; +import com.wugui.datax.admin.util.AESUtil; import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +55,12 @@ public abstract class BaseQueryTool implements QueryToolInterface { */ BaseQueryTool(JobJdbcDatasource jobJdbcDatasource) throws SQLException { String currentDbType = JdbcUtils.getDbType(jobJdbcDatasource.getJdbcUrl(), jobJdbcDatasource.getJdbcDriverClass()); + String userName = AESUtil.decrypt(jobJdbcDatasource.getJdbcUsername()); if (LocalCacheUtil.get(jobJdbcDatasource.getDatasourceName()) == null) { //这里默认使用 hikari 数据源 HikariDataSource dataSource = new HikariDataSource(); - dataSource.setUsername(jobJdbcDatasource.getJdbcUsername()); - dataSource.setPassword(jobJdbcDatasource.getJdbcPassword()); + dataSource.setUsername(userName); + dataSource.setPassword(AESUtil.decrypt(jobJdbcDatasource.getJdbcPassword())); dataSource.setJdbcUrl(jobJdbcDatasource.getJdbcUrl()); dataSource.setDriverClassName(jobJdbcDatasource.getJdbcDriverClass()); dataSource.setMaximumPoolSize(1); @@ -70,7 +72,7 @@ public abstract class BaseQueryTool implements QueryToolInterface { this.connection = (Connection) LocalCacheUtil.get(jobJdbcDatasource.getDatasourceName()); } sqlBuilder = DatabaseMetaFactory.getByDbType(currentDbType); - currentSchema = getSchema(jobJdbcDatasource.getJdbcUsername()); + currentSchema = getSchema(userName); LocalCacheUtil.set(jobJdbcDatasource.getDatasourceName(), this.connection, 4 * 60 * 60 * 1000); } diff --git a/datax-admin/src/main/java/com/wugui/datax/admin/util/AESUtil.java b/datax-admin/src/main/java/com/wugui/datax/admin/util/AESUtil.java new file mode 100644 index 00000000..d17f36b5 --- /dev/null +++ b/datax-admin/src/main/java/com/wugui/datax/admin/util/AESUtil.java @@ -0,0 +1,92 @@ +package com.wugui.datax.admin.util; + +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.commons.codec.binary.Base64; + +public class AESUtil { + // 密钥 + public static String key = "AD42F6697B035B7580E4FEF93BE20BAD"; + private static String charset = "utf-8"; + // 偏移量 + private static int offset = 16; + private static String transformation = "AES/CBC/PKCS5Padding"; + private static String algorithm = "AES"; + + /** + * 加密 + * + * @param content + * @return + */ + public static String encrypt(String content) { + return encrypt(content, key); + } + + /** + * 解密 + * + * @param content + * @return + */ + public static String decrypt(String content) { + return decrypt(content, key); + } + + /** + * 加密 + * + * @param content 需要加密的内容 + * @param key 加密密码 + * @return + */ + public static String encrypt(String content, String key) { + try { + SecretKeySpec skey = new SecretKeySpec(key.getBytes(), algorithm); + IvParameterSpec iv = new IvParameterSpec(key.getBytes(), 0, offset); + Cipher cipher = Cipher.getInstance(transformation); + byte[] byteContent = content.getBytes(charset); + cipher.init(Cipher.ENCRYPT_MODE, skey, iv);// 初始化 + byte[] result = cipher.doFinal(byteContent); + return new Base64().encodeToString(result); // 加密 + } catch (Exception e) { + // LogUtil.exception(e); + } + return null; + } + + /** + * AES(256)解密 + * + * @param content 待解密内容 + * @param key 解密密钥 + * @return 解密之后 + * @throws Exception + */ + public static String decrypt(String content, String key) { + try { + + SecretKeySpec skey = new SecretKeySpec(key.getBytes(), algorithm); + IvParameterSpec iv = new IvParameterSpec(key.getBytes(), 0, offset); + Cipher cipher = Cipher.getInstance(transformation); + cipher.init(Cipher.DECRYPT_MODE, skey, iv);// 初始化 + byte[] result = cipher.doFinal(new Base64().decode(content)); + return new String(result); // 解密 + } catch (Exception e) { + //LogUtil.exception(e); + } + return null; + } + + public static void main(String[] args) throws Exception { + String s = "root"; + // 加密 + System.out.println("加密前:" + s); + String encryptResultStr = encrypt(s); + System.out.println("加密后:" + encryptResultStr); + // 解密 + System.out.println("解密后:" + decrypt(encryptResultStr)); + } +} \ No newline at end of file diff --git a/datax-admin/src/main/resources/application.yml b/datax-admin/src/main/resources/application.yml index 9f19c9d0..b299670d 100644 --- a/datax-admin/src/main/resources/application.yml +++ b/datax-admin/src/main/resources/application.yml @@ -77,6 +77,7 @@ mybatis-plus: cache-enabled: false call-setters-on-nulls: true jdbc-type-for-null: 'null' + type-handlers-package: com.wugui.datax.admin.core.handler # 配置mybatis-plus打印slq日志 logging: diff --git a/datax-admin/src/main/resources/mybatis-mapper/JobJdbcDatasourceMapper.xml b/datax-admin/src/main/resources/mybatis-mapper/JobJdbcDatasourceMapper.xml index 3a7b60b0..d8210502 100644 --- a/datax-admin/src/main/resources/mybatis-mapper/JobJdbcDatasourceMapper.xml +++ b/datax-admin/src/main/resources/mybatis-mapper/JobJdbcDatasourceMapper.xml @@ -2,4 +2,11 @@ + + + + + + + \ No newline at end of file diff --git a/datax-rpc/src/main/java/com/wugui/datax/rpc/remoting/net/common/ConnectClient.java b/datax-rpc/src/main/java/com/wugui/datax/rpc/remoting/net/common/ConnectClient.java index 57ca3146..748dabb9 100644 --- a/datax-rpc/src/main/java/com/wugui/datax/rpc/remoting/net/common/ConnectClient.java +++ b/datax-rpc/src/main/java/com/wugui/datax/rpc/remoting/net/common/ConnectClient.java @@ -51,6 +51,7 @@ public abstract class ConnectClient { private static volatile ConcurrentMap connectClientMap; // (static) alread addStopCallBack private static volatile ConcurrentMap connectClientLockMap = new ConcurrentHashMap<>(); + private static ConnectClient getPool(String address, Class connectClientImpl, final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { @@ -65,7 +66,7 @@ public abstract class ConnectClient { @Override public void run() throws Exception { if (connectClientMap.size() > 0) { - for (String key: connectClientMap.keySet()) { + for (String key : connectClientMap.keySet()) { ConnectClient clientPool = connectClientMap.get(key); clientPool.close(); } @@ -79,7 +80,7 @@ public abstract class ConnectClient { // get-valid client ConnectClient connectClient = connectClientMap.get(address); - if (connectClient!=null && connectClient.isValidate()) { + if (connectClient != null && connectClient.isValidate()) { return connectClient; } @@ -95,7 +96,7 @@ public abstract class ConnectClient { // get-valid client, avlid repeat connectClient = connectClientMap.get(address); - if (connectClient!=null && connectClient.isValidate()) { + if (connectClient != null && connectClient.isValidate()) { return connectClient; } diff --git a/datax-rpc/src/main/java/com/wugui/datax/rpc/serialize/impl/Hessian1Serializer.java b/datax-rpc/src/main/java/com/wugui/datax/rpc/serialize/impl/Hessian1Serializer.java index 68561216..4d088409 100644 --- a/datax-rpc/src/main/java/com/wugui/datax/rpc/serialize/impl/Hessian1Serializer.java +++ b/datax-rpc/src/main/java/com/wugui/datax/rpc/serialize/impl/Hessian1Serializer.java @@ -1,66 +1,66 @@ -package com.wugui.datax.rpc.serialize.impl; - -import com.caucho.hessian.io.HessianInput; -import com.caucho.hessian.io.HessianOutput; -import com.wugui.datax.rpc.serialize.Serializer; -import com.wugui.datax.rpc.util.XxlRpcException; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * hessian serialize - * @author xuxueli 2015-9-26 02:53:29 - */ -public class Hessian1Serializer extends Serializer { - - @Override - public byte[] serialize(T obj){ - ByteArrayOutputStream os = new ByteArrayOutputStream(); - HessianOutput ho = new HessianOutput(os); - try { - ho.writeObject(obj); - ho.flush(); - byte[] result = os.toByteArray(); - return result; - } catch (IOException e) { - throw new XxlRpcException(e); - } finally { - try { - ho.close(); - } catch (IOException e) { - throw new XxlRpcException(e); - } - try { - os.close(); - } catch (IOException e) { - throw new XxlRpcException(e); - } - } - } - - @Override - public Object deserialize(byte[] bytes, Class clazz) { - ByteArrayInputStream is = new ByteArrayInputStream(bytes); - HessianInput hi = new HessianInput(is); - try { - Object result = hi.readObject(); - return result; - } catch (IOException e) { - throw new XxlRpcException(e); - } finally { - try { - hi.close(); - } catch (Exception e) { - throw new XxlRpcException(e); - } - try { - is.close(); - } catch (IOException e) { - throw new XxlRpcException(e); - } - } - } - -} +//package com.wugui.datax.rpc.serialize.impl; +// +//import com.caucho.hessian.io.HessianInput; +//import com.caucho.hessian.io.HessianOutput; +//import com.wugui.datax.rpc.serialize.Serializer; +//import com.wugui.datax.rpc.util.XxlRpcException; +// +//import java.io.ByteArrayInputStream; +//import java.io.ByteArrayOutputStream; +//import java.io.IOException; +// +///** +// * hessian serialize +// * @author xuxueli 2015-9-26 02:53:29 +// */ +//public class Hessian1Serializer extends Serializer { +// +// @Override +// public byte[] serialize(T obj){ +// ByteArrayOutputStream os = new ByteArrayOutputStream(); +// HessianOutput ho = new HessianOutput(os); +// try { +// ho.writeObject(obj); +// ho.flush(); +// byte[] result = os.toByteArray(); +// return result; +// } catch (IOException e) { +// throw new XxlRpcException(e); +// } finally { +// try { +// ho.close(); +// } catch (IOException e) { +// throw new XxlRpcException(e); +// } +// try { +// os.close(); +// } catch (IOException e) { +// throw new XxlRpcException(e); +// } +// } +// } +// +// @Override +// public Object deserialize(byte[] bytes, Class clazz) { +// ByteArrayInputStream is = new ByteArrayInputStream(bytes); +// HessianInput hi = new HessianInput(is); +// try { +// Object result = hi.readObject(); +// return result; +// } catch (IOException e) { +// throw new XxlRpcException(e); +// } finally { +// try { +// hi.close(); +// } catch (Exception e) { +// throw new XxlRpcException(e); +// } +// try { +// is.close(); +// } catch (IOException e) { +// throw new XxlRpcException(e); +// } +// } +// } +// +//} diff --git a/datax-rpc/src/main/java/com/wugui/datax/rpc/util/ThreadPoolUtil.java b/datax-rpc/src/main/java/com/wugui/datax/rpc/util/ThreadPoolUtil.java index f11c6a2b..6fb4d421 100644 --- a/datax-rpc/src/main/java/com/wugui/datax/rpc/util/ThreadPoolUtil.java +++ b/datax-rpc/src/main/java/com/wugui/datax/rpc/util/ThreadPoolUtil.java @@ -13,25 +13,17 @@ public class ThreadPoolUtil { * @param serverType * @return */ - public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize){ + public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize) { ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(1000), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "xxl-rpc, "+serverType+"-serverHandlerPool-" + r.hashCode()); - } - }, - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - throw new XxlRpcException("xxl-rpc "+serverType+" Thread pool is EXHAUSTED!"); - } - }); // default maxThreads 300, minThreads 60 + new LinkedBlockingQueue<>(1000), + r -> new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode()), + (r, executor) -> { + throw new XxlRpcException("xxl-rpc " + serverType + " Thread pool is EXHAUSTED!"); + }); // default maxThreads 300, minThreads 60 return serverHandlerPool; } diff --git a/pom.xml b/pom.xml index f963409f..bb8bf463 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ 5.1.47 4.1.0 2.5.8 - 3.0.7.1 + 3.3.1 2.9.2 1.5.21 4.3.25.RELEASE