add milvus writer plugin

This commit is contained in:
Nian Liu 2024-11-27 17:52:09 +08:00
parent 3614c2633e
commit 906bf3ded9
No known key found for this signature in database
GPG Key ID: D6A98BE269ACD54D
10 changed files with 576 additions and 0 deletions

109
milvuswriter/pom.xml Normal file
View File

@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>milvuswriter</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
<version>32.0.1-jre</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit5</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<resources>
<!--将resource目录也输出到target-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,36 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/milvuswriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>milvuswriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/milvuswriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/milvuswriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.writer.milvuswriter;
import java.nio.Buffer;
import java.nio.ByteBuffer;
public class BufferUtils {
public static ByteBuffer toByteBuffer(Short[] shortArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
for (Short value : shortArray) {
byteBuffer.putShort(value);
}
// Compatible compilation and running versions are not consistent
// Flip the buffer to prepare for reading
((Buffer) byteBuffer).flip();
return byteBuffer;
}
public static Short[] toShortArray(ByteBuffer byteBuffer) {
Short[] shortArray = new Short[byteBuffer.capacity() / 2];
for (int i = 0; i < shortArray.length; i++) {
shortArray[i] = byteBuffer.getShort();
}
return shortArray;
}
public static ByteBuffer toByteBuffer(Float[] floatArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
for (float value : floatArray) {
byteBuffer.putFloat(value);
}
((Buffer) byteBuffer).flip();
return byteBuffer;
}
public static Float[] toFloatArray(ByteBuffer byteBuffer) {
Float[] floatArray = new Float[byteBuffer.capacity() / 4];
for (int i = 0; i < floatArray.length; i++) {
floatArray[i] = byteBuffer.getFloat();
}
return floatArray;
}
public static ByteBuffer toByteBuffer(Double[] doubleArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
for (double value : doubleArray) {
byteBuffer.putDouble(value);
}
((Buffer) byteBuffer).flip();
return byteBuffer;
}
public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
for (int i = 0; i < doubleArray.length; i++) {
doubleArray[i] = byteBuffer.getDouble();
}
return doubleArray;
}
public static ByteBuffer toByteBuffer(Integer[] intArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
for (int value : intArray) {
byteBuffer.putInt(value);
}
((Buffer) byteBuffer).flip();
return byteBuffer;
}
public static Integer[] toIntArray(ByteBuffer byteBuffer) {
Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
for (int i = 0; i < intArray.length; i++) {
intArray[i] = byteBuffer.getInt();
}
return intArray;
}
}

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
public class KeyConstant {
public static final String URI = "uri";
public static final String TOKEN = "token";
public static final String DATABASE = "database";
public static final String COLLECTION = "collection";
public static final String AUTO_ID = "autoId";
public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
public static final String BATCH_SIZE = "batchSize";
public static final String COLUMN = "column";
public static final String COLUMN_TYPE = "type";
public static final String COLUMN_NAME = "name";
public static final String VECTOR_DIMENSION = "dimension";
public static final String IS_PRIMARY_KEY = "isPrimaryKey";
// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception"
public static final String schemaCreateMode = "schemaCreateMode";
public static final String IS_PARTITION_KEY = "isPartitionKey";
public static final String MAX_LENGTH = "maxLength";
}

View File

@ -0,0 +1,43 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.google.gson.JsonObject;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.vector.request.UpsertReq;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class MilvusBufferWriter {
private final MilvusClientV2 milvusClientV2;
private final String collection;
private final Integer batchSize;
private List<JsonObject> dataCache;
public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){
this.milvusClientV2 = milvusClientV2;
this.collection = collection;
this.batchSize = batchSize;
this.dataCache = new ArrayList<>();
}
public void write(JsonObject data){
dataCache.add(data);
}
public Boolean needCommit(){
return dataCache.size() >= batchSize;
}
public void commit(){
if(dataCache.isEmpty()){
log.info("dataCache is empty, skip commit");
return;
}
UpsertReq upsertReq = UpsertReq.builder()
.collectionName(collection)
.data(dataCache)
.build();
milvusClientV2.upsert(upsertReq);
dataCache = new ArrayList<>();
}
}

View File

@ -0,0 +1,86 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson2.JSONArray;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.milvus.v2.common.DataType;
import static io.milvus.v2.common.DataType.*;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.stream.Collectors;
public class MilvusSinkConverter {
public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) {
JsonObject data = new JsonObject();
Gson gson = new Gson();
for(int i = 0; i < record.getColumnNumber(); i++) {
String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE);
String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME);
Object rawData = record.getColumn(i).getRawData();
Object field = convertToMilvusField(fieldType, rawData);
data.add(fieldName, gson.toJsonTree(field));
}
return data;
}
private Object convertToMilvusField(String type, Object rawData) {
Gson gson = new Gson();
switch (valueOf(type)) {
case Int32:
return Integer.parseInt(rawData.toString());
case Int64:
return Long.parseLong(rawData.toString());
case Float:
return java.lang.Float.parseFloat(rawData.toString());
case String:
case VarChar:
return rawData.toString();
case Bool:
return Boolean.parseBoolean(rawData.toString());
case FloatVector:
java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new);
return Arrays.stream(floats).collect(Collectors.toList());
case BinaryVector:
java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new);
return BufferUtils.toByteBuffer(binarys);
case Float16Vector:
case BFloat16Vector:
// all these data is byte format in milvus
ByteBuffer binaryVector = (ByteBuffer) rawData;
return gson.toJsonTree(binaryVector.array());
case SparseFloatVector:
return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject();
default:
throw new RuntimeException("Unsupported data type");
}
}
public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) {
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
for (int i = 0; i < milvusColumnMeta.size(); i++) {
AddFieldReq addFieldReq = AddFieldReq.builder()
.fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME))
.dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE)))
.build();
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) {
addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY));
}
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) {
addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION));
}
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) {
addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY));
}
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) {
addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
}
collectionSchema.addField(addFieldReq);
}
return collectionSchema;
}
}

View File

@ -0,0 +1,130 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.google.gson.JsonObject;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.HasCollectionReq;
import io.milvus.v2.service.vector.request.UpsertReq;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
public class MilvusWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
/**
* 切分任务<br>
*
* @param mandatoryNumber 为了做到ReaderWriter任务数对等这里要求Writer插件必须按照源端的切分数进行切分否则框架报错
*/
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configList = new ArrayList<Configuration>();
for(int i = 0; i < mandatoryNumber; i++) {
configList.add(this.originalConfig.clone());
}
return configList;
}
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private MilvusClientV2 milvusClientV2;
private MilvusSinkConverter milvusSinkConverter;
private MilvusBufferWriter milvusBufferWriter;
private String collection = null;
private JSONArray milvusColumnMeta;
private String schemaCreateMode = "createWhenTableNotExit";
@Override
public void startWrite(RecordReceiver lineReceiver) {
Record record = lineReceiver.getFromReader();
JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record);
milvusBufferWriter.write(data);
if(milvusBufferWriter.needCommit()){
log.info("Reached buffer limit, Committing data");
milvusBufferWriter.commit();
log.info("Data committed");
}
}
@Override
public void init() {
log.info("Initializing Milvus writer");
// get configuration
Configuration writerSliceConfig = this.getPluginJobConf();
this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN));
this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ?
"createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode);
int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
log.info("Collection:{}", this.collection);
// connect to milvus
ConnectConfig connectConfig = ConnectConfig.builder()
.uri(writerSliceConfig.getString(KeyConstant.URI))
.token(writerSliceConfig.getString(KeyConstant.TOKEN))
.build();
if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {
log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE));
connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE));
}
this.milvusClientV2 = new MilvusClientV2(connectConfig);
this.milvusSinkConverter = new MilvusSinkConverter();
this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize);
log.info("Milvus writer initialized");
}
@Override
public void prepare() {
super.prepare();
Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build());
if (!hasCollection) {
log.info("Collection not exist");
if (schemaCreateMode.equals("createWhenTableNotExit")) {
// create collection
log.info("Creating collection:{}", this.collection);
CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta);
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
.collectionName(collection)
.collectionSchema(collectionSchema)
.build();
milvusClientV2.createCollection(createCollectionReq);
} else if (schemaCreateMode.equals("exception")) {
log.error("Collection not exist, throw exception");
throw new RuntimeException("Collection not exist");
}
}
}
@Override
public void destroy() {
log.info("Closing Milvus writer, committing data and closing connection");
this.milvusBufferWriter.commit();
this.milvusClientV2.close();
}
}
}

View File

@ -0,0 +1,6 @@
{
"name": "milvuswriter",
"class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter",
"description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.",
"developer": "nianliuu"
}

View File

@ -0,0 +1,15 @@
{
"name": "mongodbwriter",
"parameter": {
"address": [],
"userName": "",
"userPassword": "",
"dbName": "",
"collectionName": "",
"column": [],
"upsertInfo": {
"isUpsert": "",
"upsertKey": ""
}
}
}

View File

@ -129,6 +129,7 @@
<module>adbmysqlwriter</module>
<module>sybasewriter</module>
<module>neo4jwriter</module>
<module>milvuswriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>