Merge pull request #6 from WeiYe-Jing/dev

Dev
This commit is contained in:
water 2020-03-18 10:26:39 +08:00 committed by GitHub
commit 565b93c16f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 248 additions and 97 deletions

View File

@ -38,7 +38,9 @@ public class ControllerAspect {
String method = signature.getDeclaringTypeName().replace(clsPath, "") + "." + signature.getName();
long start = System.currentTimeMillis();
String input = Arrays.toString(joinPoint.getArgs());
log.info("controller:{},input:{}", method, input);
if (!method.contains("JobApiController")) {
log.info("controller:{},input:{}", method, input);
}
try {
Object resObj = joinPoint.proceed();
return resObj;

View File

@ -1,7 +1,7 @@
package com.wugui.datax.admin.config;
import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector;
import com.baomidou.mybatisplus.core.injector.ISqlInjector;
import com.baomidou.mybatisplus.extension.injector.LogicSqlInjector;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
@ -10,6 +10,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* MybatisPlus配置类 Spring boot方式
*
* @author huzekang
*/
@EnableTransactionManagement
@ -29,11 +30,15 @@ public class MybatisPlusConfig {
/**
* MyBatisPlus逻辑删除 需要在 yml 中配置开启
* 3.0.7.1版本的LogicSqlInjector里面什么都没做只是 extends DefaultSqlInjector
* 以后版本直接去的了LogicSqlInjector
*
* @return
*/
@Bean
public ISqlInjector sqlInjector() {
return new LogicSqlInjector();
return new DefaultSqlInjector();
}
}

View File

@ -0,0 +1,44 @@
package com.wugui.datax.admin.core.handler;
import com.wugui.datax.admin.util.AESUtil;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedTypes;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @author water
* @date 20-03-17 下午5:38
*/
@MappedTypes({String.class})
public class AESEncryptHandler extends BaseTypeHandler<String> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i, String parameter, JdbcType jdbcType) throws SQLException {
ps.setString(i, AESUtil.encrypt(parameter));
}
@Override
public String getNullableResult(ResultSet rs, String columnName) throws SQLException {
String columnValue = rs.getString(columnName);
return AESUtil.decrypt(columnValue);
}
@Override
public String getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
String columnValue = rs.getString(columnIndex);
return AESUtil.decrypt(columnValue);
}
@Override
public String getNullableResult(CallableStatement cs, int columnIndex)
throws SQLException {
String columnValue = cs.getString(columnIndex);
return AESUtil.decrypt(columnValue);
}
}

View File

@ -1,4 +1,4 @@
package com.wugui.datax.admin.config;
package com.wugui.datax.admin.core.handler;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import lombok.extern.slf4j.Slf4j;

View File

@ -3,6 +3,7 @@ package com.wugui.datax.admin.entity;
import com.alibaba.fastjson.annotation.JSONField;
import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.wugui.datax.admin.core.handler.AESEncryptHandler;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ -50,13 +51,17 @@ public class JobJdbcDatasource extends Model<JobJdbcDatasource> {
/**
* 用户名
* AESEncryptHandler 加密类
* MyBatis Plus 3.0.7.1之前版本没有typeHandler属性需要升级到最低3.1.2
*/
@ApiModelProperty(value = "用户名")
@TableField(typeHandler = AESEncryptHandler.class)
private String jdbcUsername;
/**
* 密码
*/
@TableField(typeHandler = AESEncryptHandler.class)
@ApiModelProperty(value = "密码")
private String jdbcPassword;

View File

@ -7,6 +7,7 @@ import com.wugui.datax.admin.entity.JobJdbcDatasource;
import com.wugui.datax.admin.tool.datax.BaseDataxPlugin;
import com.wugui.datax.admin.tool.pojo.DataxHivePojo;
import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo;
import com.wugui.datax.admin.util.AESUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@ -26,15 +27,13 @@ public abstract class BaseReaderPlugin extends BaseDataxPlugin {
public Map<String, Object> build(DataxRdbmsPojo plugin) {
//构建
Map<String, Object> readerObj = Maps.newLinkedHashMap();
readerObj.put("name", getName());
//
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
Map<String, Object> connectionObj = Maps.newLinkedHashMap();
JobJdbcDatasource jobJdbcDatasource = plugin.getJdbcDatasource();
parameterObj.put("username", jobJdbcDatasource.getJdbcUsername());
parameterObj.put("password", jobJdbcDatasource.getJdbcPassword());
parameterObj.put("username", AESUtil.decrypt(jobJdbcDatasource.getJdbcUsername()));
parameterObj.put("password", AESUtil.decrypt(jobJdbcDatasource.getJdbcPassword()));
//判断是否是 querySql
if (StrUtil.isNotBlank(plugin.getQuerySql())) {

View File

@ -6,6 +6,7 @@ import com.wugui.datax.admin.entity.JobJdbcDatasource;
import com.wugui.datax.admin.tool.datax.BaseDataxPlugin;
import com.wugui.datax.admin.tool.pojo.DataxHivePojo;
import com.wugui.datax.admin.tool.pojo.DataxRdbmsPojo;
import com.wugui.datax.admin.util.AESUtil;
import java.util.Map;
@ -26,8 +27,8 @@ public abstract class BaseWriterPlugin extends BaseDataxPlugin {
Map<String, Object> parameterObj = Maps.newLinkedHashMap();
// parameterObj.put("writeMode", "insert");
JobJdbcDatasource jobJdbcDatasource = plugin.getJdbcDatasource();
parameterObj.put("username", jobJdbcDatasource.getJdbcUsername());
parameterObj.put("password", jobJdbcDatasource.getJdbcPassword());
parameterObj.put("username", AESUtil.decrypt(jobJdbcDatasource.getJdbcUsername()));
parameterObj.put("password", AESUtil.decrypt(jobJdbcDatasource.getJdbcPassword()));
parameterObj.put("column", plugin.getRdbmsColumns());
// preSql
parameterObj.put("preSql", ImmutableList.of(plugin.getPreSql()));

View File

@ -13,6 +13,7 @@ import com.wugui.datax.admin.tool.database.DasColumn;
import com.wugui.datax.admin.tool.database.TableInfo;
import com.wugui.datax.admin.tool.meta.DatabaseInterface;
import com.wugui.datax.admin.tool.meta.DatabaseMetaFactory;
import com.wugui.datax.admin.util.AESUtil;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,11 +55,12 @@ public abstract class BaseQueryTool implements QueryToolInterface {
*/
BaseQueryTool(JobJdbcDatasource jobJdbcDatasource) throws SQLException {
String currentDbType = JdbcUtils.getDbType(jobJdbcDatasource.getJdbcUrl(), jobJdbcDatasource.getJdbcDriverClass());
String userName = AESUtil.decrypt(jobJdbcDatasource.getJdbcUsername());
if (LocalCacheUtil.get(jobJdbcDatasource.getDatasourceName()) == null) {
//这里默认使用 hikari 数据源
HikariDataSource dataSource = new HikariDataSource();
dataSource.setUsername(jobJdbcDatasource.getJdbcUsername());
dataSource.setPassword(jobJdbcDatasource.getJdbcPassword());
dataSource.setUsername(userName);
dataSource.setPassword(AESUtil.decrypt(jobJdbcDatasource.getJdbcPassword()));
dataSource.setJdbcUrl(jobJdbcDatasource.getJdbcUrl());
dataSource.setDriverClassName(jobJdbcDatasource.getJdbcDriverClass());
dataSource.setMaximumPoolSize(1);
@ -70,7 +72,7 @@ public abstract class BaseQueryTool implements QueryToolInterface {
this.connection = (Connection) LocalCacheUtil.get(jobJdbcDatasource.getDatasourceName());
}
sqlBuilder = DatabaseMetaFactory.getByDbType(currentDbType);
currentSchema = getSchema(jobJdbcDatasource.getJdbcUsername());
currentSchema = getSchema(userName);
LocalCacheUtil.set(jobJdbcDatasource.getDatasourceName(), this.connection, 4 * 60 * 60 * 1000);
}

View File

@ -0,0 +1,92 @@
package com.wugui.datax.admin.util;
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
public class AESUtil {
// 密钥
public static String key = "AD42F6697B035B7580E4FEF93BE20BAD";
private static String charset = "utf-8";
// 偏移量
private static int offset = 16;
private static String transformation = "AES/CBC/PKCS5Padding";
private static String algorithm = "AES";
/**
* 加密
*
* @param content
* @return
*/
public static String encrypt(String content) {
return encrypt(content, key);
}
/**
* 解密
*
* @param content
* @return
*/
public static String decrypt(String content) {
return decrypt(content, key);
}
/**
* 加密
*
* @param content 需要加密的内容
* @param key 加密密码
* @return
*/
public static String encrypt(String content, String key) {
try {
SecretKeySpec skey = new SecretKeySpec(key.getBytes(), algorithm);
IvParameterSpec iv = new IvParameterSpec(key.getBytes(), 0, offset);
Cipher cipher = Cipher.getInstance(transformation);
byte[] byteContent = content.getBytes(charset);
cipher.init(Cipher.ENCRYPT_MODE, skey, iv);// 初始化
byte[] result = cipher.doFinal(byteContent);
return new Base64().encodeToString(result); // 加密
} catch (Exception e) {
// LogUtil.exception(e);
}
return null;
}
/**
* AES256解密
*
* @param content 待解密内容
* @param key 解密密钥
* @return 解密之后
* @throws Exception
*/
public static String decrypt(String content, String key) {
try {
SecretKeySpec skey = new SecretKeySpec(key.getBytes(), algorithm);
IvParameterSpec iv = new IvParameterSpec(key.getBytes(), 0, offset);
Cipher cipher = Cipher.getInstance(transformation);
cipher.init(Cipher.DECRYPT_MODE, skey, iv);// 初始化
byte[] result = cipher.doFinal(new Base64().decode(content));
return new String(result); // 解密
} catch (Exception e) {
//LogUtil.exception(e);
}
return null;
}
public static void main(String[] args) throws Exception {
String s = "root";
// 加密
System.out.println("加密前:" + s);
String encryptResultStr = encrypt(s);
System.out.println("加密后:" + encryptResultStr);
// 解密
System.out.println("解密后:" + decrypt(encryptResultStr));
}
}

View File

@ -77,6 +77,7 @@ mybatis-plus:
cache-enabled: false
call-setters-on-nulls: true
jdbc-type-for-null: 'null'
type-handlers-package: com.wugui.datax.admin.core.handler
# 配置mybatis-plus打印slq日志
logging:

View File

@ -2,4 +2,11 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wugui.datax.admin.mapper.JobJdbcDatasourceMapper">
<!--<resultMap type="com.wugui.datax.admin.entity.JobJdbcDatasource" id="jobJdbcDatasource">-->
<!--<result property="jdbcUsername" column="jdbc_username" jdbcType="VARCHAR" javaType="String"-->
<!--typeHandler="com.wugui.datax.admin.core.util.AESEncryptHandler"/>-->
<!--<result property="jdbcPassword" column="jdbc_password" jdbcType="VARCHAR" javaType="String"-->
<!--typeHandler="com.wugui.datax.admin.core.util.AESEncryptHandler"/>-->
<!--</resultMap>-->
</mapper>

View File

@ -51,6 +51,7 @@ public abstract class ConnectClient {
private static volatile ConcurrentMap<String, ConnectClient> connectClientMap; // (static) alread addStopCallBack
private static volatile ConcurrentMap<String, Object> connectClientLockMap = new ConcurrentHashMap<>();
private static ConnectClient getPool(String address, Class<? extends ConnectClient> connectClientImpl,
final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {
@ -65,7 +66,7 @@ public abstract class ConnectClient {
@Override
public void run() throws Exception {
if (connectClientMap.size() > 0) {
for (String key: connectClientMap.keySet()) {
for (String key : connectClientMap.keySet()) {
ConnectClient clientPool = connectClientMap.get(key);
clientPool.close();
}
@ -79,7 +80,7 @@ public abstract class ConnectClient {
// get-valid client
ConnectClient connectClient = connectClientMap.get(address);
if (connectClient!=null && connectClient.isValidate()) {
if (connectClient != null && connectClient.isValidate()) {
return connectClient;
}
@ -95,7 +96,7 @@ public abstract class ConnectClient {
// get-valid client, avlid repeat
connectClient = connectClientMap.get(address);
if (connectClient!=null && connectClient.isValidate()) {
if (connectClient != null && connectClient.isValidate()) {
return connectClient;
}

View File

@ -1,66 +1,66 @@
package com.wugui.datax.rpc.serialize.impl;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import com.wugui.datax.rpc.serialize.Serializer;
import com.wugui.datax.rpc.util.XxlRpcException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* hessian serialize
* @author xuxueli 2015-9-26 02:53:29
*/
public class Hessian1Serializer extends Serializer {
@Override
public <T> byte[] serialize(T obj){
ByteArrayOutputStream os = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(os);
try {
ho.writeObject(obj);
ho.flush();
byte[] result = os.toByteArray();
return result;
} catch (IOException e) {
throw new XxlRpcException(e);
} finally {
try {
ho.close();
} catch (IOException e) {
throw new XxlRpcException(e);
}
try {
os.close();
} catch (IOException e) {
throw new XxlRpcException(e);
}
}
}
@Override
public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(is);
try {
Object result = hi.readObject();
return result;
} catch (IOException e) {
throw new XxlRpcException(e);
} finally {
try {
hi.close();
} catch (Exception e) {
throw new XxlRpcException(e);
}
try {
is.close();
} catch (IOException e) {
throw new XxlRpcException(e);
}
}
}
}
//package com.wugui.datax.rpc.serialize.impl;
//
//import com.caucho.hessian.io.HessianInput;
//import com.caucho.hessian.io.HessianOutput;
//import com.wugui.datax.rpc.serialize.Serializer;
//import com.wugui.datax.rpc.util.XxlRpcException;
//
//import java.io.ByteArrayInputStream;
//import java.io.ByteArrayOutputStream;
//import java.io.IOException;
//
///**
// * hessian serialize
// * @author xuxueli 2015-9-26 02:53:29
// */
//public class Hessian1Serializer extends Serializer {
//
// @Override
// public <T> byte[] serialize(T obj){
// ByteArrayOutputStream os = new ByteArrayOutputStream();
// HessianOutput ho = new HessianOutput(os);
// try {
// ho.writeObject(obj);
// ho.flush();
// byte[] result = os.toByteArray();
// return result;
// } catch (IOException e) {
// throw new XxlRpcException(e);
// } finally {
// try {
// ho.close();
// } catch (IOException e) {
// throw new XxlRpcException(e);
// }
// try {
// os.close();
// } catch (IOException e) {
// throw new XxlRpcException(e);
// }
// }
// }
//
// @Override
// public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
// ByteArrayInputStream is = new ByteArrayInputStream(bytes);
// HessianInput hi = new HessianInput(is);
// try {
// Object result = hi.readObject();
// return result;
// } catch (IOException e) {
// throw new XxlRpcException(e);
// } finally {
// try {
// hi.close();
// } catch (Exception e) {
// throw new XxlRpcException(e);
// }
// try {
// is.close();
// } catch (IOException e) {
// throw new XxlRpcException(e);
// }
// }
// }
//
//}

View File

@ -13,25 +13,17 @@ public class ThreadPoolUtil {
* @param serverType
* @return
*/
public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize){
public static ThreadPoolExecutor makeServerThreadPool(final String serverType, int corePoolSize, int maxPoolSize) {
ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, "+serverType+"-serverHandlerPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new XxlRpcException("xxl-rpc "+serverType+" Thread pool is EXHAUSTED!");
}
}); // default maxThreads 300, minThreads 60
new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode()),
(r, executor) -> {
throw new XxlRpcException("xxl-rpc " + serverType + " Thread pool is EXHAUSTED!");
}); // default maxThreads 300, minThreads 60
return serverHandlerPool;
}

View File

@ -38,7 +38,7 @@
<mysql-connector.version>5.1.47</mysql-connector.version>
<jna.version>4.1.0</jna.version>
<groovy.version>2.5.8</groovy.version>
<mybatisplus.version>3.0.7.1</mybatisplus.version>
<mybatisplus.version>3.3.1</mybatisplus.version>
<swagger.version>2.9.2</swagger.version>
<swagger-models.version>1.5.21</swagger-models.version>
<spring.version>4.3.25.RELEASE</spring.version>