add parquet encoding (#64)

* add parquet encoding

* add alternative content-type for parquet encoding
This commit is contained in:
Erwin Kroon 2021-08-30 08:45:13 +02:00 committed by GitHub
parent 0e8690d509
commit c79a35a3fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 0 deletions

View File

@ -4,6 +4,7 @@ pub enum ContentType {
Json,
Csv,
ArrowStream,
Parquet,
}
impl ContentType {
@ -12,6 +13,7 @@ impl ContentType {
ContentType::Json => "application/json",
ContentType::Csv => "application/csv",
ContentType::ArrowStream => "application/vnd.apache.arrow.stream",
ContentType::Parquet => "application/parquet",
}
}
}
@ -26,6 +28,7 @@ impl TryFrom<&[u8]> for ContentType {
b"application/arrow.stream" | b"application/vnd.apache.arrow.stream" => {
Ok(ContentType::ArrowStream)
}
b"application/parquet" | b"application/vnd.apache.parquet" => Ok(ContentType::Parquet),
_ => Err(()),
}
}
@ -34,3 +37,4 @@ impl TryFrom<&[u8]> for ContentType {
pub mod arrow;
pub mod csv;
pub mod json;
pub mod parquet;

View File

@ -0,0 +1,23 @@
use datafusion::arrow;
use datafusion::parquet;
use datafusion::parquet::errors::ParquetError;
pub fn record_batches_to_bytes(
batches: &[arrow::record_batch::RecordBatch],
) -> Result<Vec<u8>, ParquetError> {
let cursor = parquet::util::cursor::InMemoryWriteableCursor::default();
{
if !batches.is_empty() {
let schema = batches[0].schema();
let mut writer = parquet::arrow::ArrowWriter::try_new(cursor.clone(), schema, None)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
}
}
let result = cursor.into_inner().expect("Should not fail");
Ok(result)
}

View File

@ -68,6 +68,11 @@ pub fn encode_record_batches(
.map_err(ApiErrResp::arrow_stream_serialization)?;
Ok(builder.body(payload))
}
encoding::ContentType::Parquet => {
let payload = encoding::parquet::record_batches_to_bytes(batches)
.map_err(ApiErrResp::parquet_serialization)?;
Ok(builder.body(payload))
}
}
}

View File

@ -2,6 +2,7 @@ use std::fmt;
use actix_web::{http, HttpResponse};
use columnq::datafusion::arrow;
use columnq::datafusion::parquet;
use columnq::error::QueryError;
use serde::Serializer;
use serde_derive::Serialize;
@ -47,6 +48,14 @@ impl ApiErrResp {
}
}
pub fn parquet_serialization(_: parquet::errors::ParquetError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,
error: "parquet_serialization".to_string(),
message: "Failed to serialize record batches into parquet".to_string(),
}
}
pub fn read_query(error: std::str::Utf8Error) -> Self {
Self {
code: http::StatusCode::BAD_REQUEST,