diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml new file mode 100644 index 00000000..b889cbe4 --- /dev/null +++ b/milvuswriter/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvuswriter + + + UTF-8 + official + 1.8 + + + + + guava + com.google.guava + 32.0.1-jre + + + + + + + io.milvus + milvus-sdk-java + 2.4.8 + + + org.jetbrains.kotlin + kotlin-test-junit5 + 2.0.0 + test + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.30 + provided + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvuswriter/src/main/assembly/package.xml b/milvuswriter/src/main/assembly/package.xml new file mode 100644 index 00000000..62357b4a --- /dev/null +++ b/milvuswriter/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/milvuswriter + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/milvuswriter + + + + + + false + plugin/writer/milvuswriter/libs + runtime + + + diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java new file mode 100644 index 00000000..89153a6a --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +public class BufferUtils { + + public static ByteBuffer toByteBuffer(Short[] shortArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); + + for (Short value : shortArray) { + byteBuffer.putShort(value); + } + + // Compatible compilation and running versions are not consistent + // Flip the buffer to prepare for reading + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Short[] toShortArray(ByteBuffer byteBuffer) { + Short[] shortArray = new Short[byteBuffer.capacity() / 2]; + + for (int i = 0; i < shortArray.length; i++) { + shortArray[i] = byteBuffer.getShort(); + } + + return shortArray; + } + + public static ByteBuffer toByteBuffer(Float[] floatArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); + + for (float value : floatArray) { + byteBuffer.putFloat(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Float[] toFloatArray(ByteBuffer byteBuffer) { + Float[] floatArray = new Float[byteBuffer.capacity() / 4]; + + for (int i = 0; i < floatArray.length; i++) { + floatArray[i] = byteBuffer.getFloat(); + } + + return floatArray; + } + + public static ByteBuffer toByteBuffer(Double[] doubleArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); + + for (double value : doubleArray) { + byteBuffer.putDouble(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Double[] toDoubleArray(ByteBuffer byteBuffer) { + Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; + + for (int i = 0; i < doubleArray.length; i++) { + doubleArray[i] = byteBuffer.getDouble(); + } + + return doubleArray; + } + + public static ByteBuffer toByteBuffer(Integer[] intArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); + + for (int value : intArray) { + byteBuffer.putInt(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Integer[] toIntArray(ByteBuffer byteBuffer) { + Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; + + for (int i = 0; i < intArray.length; i++) { + intArray[i] = byteBuffer.getInt(); + } + + return intArray; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java new file mode 100644 index 00000000..cc636404 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +public class KeyConstant { + public static final String URI = "uri"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String AUTO_ID = "autoId"; + public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; + public static final String BATCH_SIZE = "batchSize"; + public static final String COLUMN = "column"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NAME = "name"; + public static final String VECTOR_DIMENSION = "dimension"; + public static final String IS_PRIMARY_KEY = "isPrimaryKey"; +// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception" + public static final String schemaCreateMode = "schemaCreateMode"; + public static final String IS_PARTITION_KEY = "isPartitionKey"; + public static final String MAX_LENGTH = "maxLength"; +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java new file mode 100644 index 00000000..da686af2 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -0,0 +1,43 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusBufferWriter { + + private final MilvusClientV2 milvusClientV2; + private final String collection; + private final Integer batchSize; + private List dataCache; + + public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){ + this.milvusClientV2 = milvusClientV2; + this.collection = collection; + this.batchSize = batchSize; + this.dataCache = new ArrayList<>(); + } + public void write(JsonObject data){ + dataCache.add(data); + } + public Boolean needCommit(){ + return dataCache.size() >= batchSize; + } + public void commit(){ + if(dataCache.isEmpty()){ + log.info("dataCache is empty, skip commit"); + return; + } + UpsertReq upsertReq = UpsertReq.builder() + .collectionName(collection) + .data(dataCache) + .build(); + milvusClientV2.upsert(upsertReq); + dataCache = new ArrayList<>(); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java new file mode 100644 index 00000000..390f95e5 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java @@ -0,0 +1,86 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.milvus.v2.common.DataType; +import static io.milvus.v2.common.DataType.*; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class MilvusSinkConverter { + public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) { + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for(int i = 0; i < record.getColumnNumber(); i++) { + String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE); + String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME); + Object rawData = record.getColumn(i).getRawData(); + Object field = convertToMilvusField(fieldType, rawData); + data.add(fieldName, gson.toJsonTree(field)); + } + return data; + } + + private Object convertToMilvusField(String type, Object rawData) { + Gson gson = new Gson(); + switch (valueOf(type)) { + case Int32: + return Integer.parseInt(rawData.toString()); + case Int64: + return Long.parseLong(rawData.toString()); + case Float: + return java.lang.Float.parseFloat(rawData.toString()); + case String: + case VarChar: + return rawData.toString(); + case Bool: + return Boolean.parseBoolean(rawData.toString()); + case FloatVector: + java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); + return Arrays.stream(floats).collect(Collectors.toList()); + case BinaryVector: + java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); + return BufferUtils.toByteBuffer(binarys); + case Float16Vector: + case BFloat16Vector: + // all these data is byte format in milvus + ByteBuffer binaryVector = (ByteBuffer) rawData; + return gson.toJsonTree(binaryVector.array()); + case SparseFloatVector: + return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject(); + default: + throw new RuntimeException("Unsupported data type"); + } + } + + public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) { + CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); + for (int i = 0; i < milvusColumnMeta.size(); i++) { + AddFieldReq addFieldReq = AddFieldReq.builder() + .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME)) + .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE))) + .build(); + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) { + addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) { + addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) { + addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) { + addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); + } + collectionSchema.addField(addFieldReq); + } + return collectionSchema; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java new file mode 100644 index 00000000..c9b5a1bc --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -0,0 +1,130 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MilvusWriter extends Writer { + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList(); + for(int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + public static class Task extends Writer.Task { + + private MilvusClientV2 milvusClientV2; + + private MilvusSinkConverter milvusSinkConverter; + private MilvusBufferWriter milvusBufferWriter; + + private String collection = null; + private JSONArray milvusColumnMeta; + + private String schemaCreateMode = "createWhenTableNotExit"; + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Record record = lineReceiver.getFromReader(); + JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); + milvusBufferWriter.write(data); + if(milvusBufferWriter.needCommit()){ + log.info("Reached buffer limit, Committing data"); + milvusBufferWriter.commit(); + log.info("Data committed"); + } + } + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN)); + this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ? + "createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode); + int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + log.info("Collection:{}", this.collection); + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder() + .uri(writerSliceConfig.getString(KeyConstant.URI)) + .token(writerSliceConfig.getString(KeyConstant.TOKEN)) + .build(); + if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { + log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); + connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + this.milvusSinkConverter = new MilvusSinkConverter(); + this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize); + log.info("Milvus writer initialized"); + } + @Override + public void prepare() { + super.prepare(); + Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); + if (!hasCollection) { + log.info("Collection not exist"); + if (schemaCreateMode.equals("createWhenTableNotExit")) { + // create collection + log.info("Creating collection:{}", this.collection); + CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta); + + CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() + .collectionName(collection) + .collectionSchema(collectionSchema) + .build(); + milvusClientV2.createCollection(createCollectionReq); + } else if (schemaCreateMode.equals("exception")) { + log.error("Collection not exist, throw exception"); + throw new RuntimeException("Collection not exist"); + } + } + } + + @Override + public void destroy() { + log.info("Closing Milvus writer, committing data and closing connection"); + this.milvusBufferWriter.commit(); + this.milvusClientV2.close(); + } + } +} diff --git a/milvuswriter/src/main/resources/plugin.json b/milvuswriter/src/main/resources/plugin.json new file mode 100644 index 00000000..8b912309 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvuswriter", + "class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter", + "description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.", + "developer": "nianliuu" +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..d4ba4bf1 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,15 @@ +{ + "name": "mongodbwriter", + "parameter": { + "address": [], + "userName": "", + "userPassword": "", + "dbName": "", + "collectionName": "", + "column": [], + "upsertInfo": { + "isUpsert": "", + "upsertKey": "" + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index c7f43f17..1b364a75 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ adbmysqlwriter sybasewriter neo4jwriter + milvuswriter plugin-rdbms-util plugin-unstructured-storage-util