diff --git a/columnq/src/encoding/mod.rs b/columnq/src/encoding/mod.rs index 57bec1e..3e8f30d 100644 --- a/columnq/src/encoding/mod.rs +++ b/columnq/src/encoding/mod.rs @@ -4,6 +4,7 @@ pub enum ContentType { Json, Csv, ArrowStream, + Parquet, } impl ContentType { @@ -12,6 +13,7 @@ impl ContentType { ContentType::Json => "application/json", ContentType::Csv => "application/csv", ContentType::ArrowStream => "application/vnd.apache.arrow.stream", + ContentType::Parquet => "application/parquet", } } } @@ -26,6 +28,7 @@ impl TryFrom<&[u8]> for ContentType { b"application/arrow.stream" | b"application/vnd.apache.arrow.stream" => { Ok(ContentType::ArrowStream) } + b"application/parquet" | b"application/vnd.apache.parquet" => Ok(ContentType::Parquet), _ => Err(()), } } @@ -34,3 +37,4 @@ impl TryFrom<&[u8]> for ContentType { pub mod arrow; pub mod csv; pub mod json; +pub mod parquet; diff --git a/columnq/src/encoding/parquet.rs b/columnq/src/encoding/parquet.rs new file mode 100644 index 0000000..fe50aee --- /dev/null +++ b/columnq/src/encoding/parquet.rs @@ -0,0 +1,23 @@ +use datafusion::arrow; +use datafusion::parquet; +use datafusion::parquet::errors::ParquetError; + +pub fn record_batches_to_bytes( + batches: &[arrow::record_batch::RecordBatch], +) -> Result, ParquetError> { + let cursor = parquet::util::cursor::InMemoryWriteableCursor::default(); + { + if !batches.is_empty() { + let schema = batches[0].schema(); + + let mut writer = parquet::arrow::ArrowWriter::try_new(cursor.clone(), schema, None)?; + for batch in batches { + writer.write(batch)?; + } + writer.close()?; + } + } + + let result = cursor.into_inner().expect("Should not fail"); + Ok(result) +} diff --git a/roapi-http/src/api/mod.rs b/roapi-http/src/api/mod.rs index 2682cf1..1c437d6 100644 --- a/roapi-http/src/api/mod.rs +++ b/roapi-http/src/api/mod.rs @@ -68,6 +68,11 @@ pub fn encode_record_batches( .map_err(ApiErrResp::arrow_stream_serialization)?; Ok(builder.body(payload)) } + encoding::ContentType::Parquet => { + let payload = encoding::parquet::record_batches_to_bytes(batches) + .map_err(ApiErrResp::parquet_serialization)?; + Ok(builder.body(payload)) + } } } diff --git a/roapi-http/src/error.rs b/roapi-http/src/error.rs index 1e15958..68388ed 100644 --- a/roapi-http/src/error.rs +++ b/roapi-http/src/error.rs @@ -2,6 +2,7 @@ use std::fmt; use actix_web::{http, HttpResponse}; use columnq::datafusion::arrow; +use columnq::datafusion::parquet; use columnq::error::QueryError; use serde::Serializer; use serde_derive::Serialize; @@ -47,6 +48,14 @@ impl ApiErrResp { } } + pub fn parquet_serialization(_: parquet::errors::ParquetError) -> Self { + Self { + code: http::StatusCode::INTERNAL_SERVER_ERROR, + error: "parquet_serialization".to_string(), + message: "Failed to serialize record batches into parquet".to_string(), + } + } + pub fn read_query(error: std::str::Utf8Error) -> Self { Self { code: http::StatusCode::BAD_REQUEST,