add support for all Arrow IPC formats in roapi-http (#67)

* add support for all arrow IPC formats in roapi-http

* refactor: schema inferrence and partitions in 1 loop
This commit is contained in:
Erwin Kroon 2021-09-05 23:27:09 +02:00 committed by GitHub
parent db143275a9
commit ff2d06b0e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 237 additions and 2 deletions

View File

@ -1,6 +1,6 @@
use datafusion::arrow;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::ipc::writer::{FileWriter, StreamWriter};
// streaming format spec:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
@ -22,3 +22,22 @@ pub fn record_batches_to_stream_bytes(
Ok(buf)
}
pub fn record_batches_to_file_bytes(
batches: &[arrow::record_batch::RecordBatch],
) -> Result<Vec<u8>, ArrowError> {
let mut buf = Vec::new();
// TODO: write out schema regardless even for empty record batch?
// see: https://issues.apache.org/jira/browse/ARROW-2119
if !batches.is_empty() {
let schema = batches[0].schema();
let mut writer = FileWriter::try_new(&mut buf, &schema)?;
for batch in batches {
writer.write(batch)?;
}
writer.finish()?;
}
Ok(buf)
}

View File

@ -3,6 +3,7 @@ use std::convert::TryFrom;
pub enum ContentType {
Json,
Csv,
ArrowFile,
ArrowStream,
Parquet,
}
@ -12,6 +13,7 @@ impl ContentType {
match self {
ContentType::Json => "application/json",
ContentType::Csv => "application/csv",
ContentType::ArrowFile => "application/vnd.apache.arrow.file",
ContentType::ArrowStream => "application/vnd.apache.arrow.stream",
ContentType::Parquet => "application/parquet",
}
@ -25,6 +27,9 @@ impl TryFrom<&[u8]> for ContentType {
match value {
b"*/*" | b"application/json" => Ok(ContentType::Json),
b"application/csv" => Ok(ContentType::Csv),
b"application/arrow.file" | b"application/vnd.apache.arrow.file" => {
Ok(ContentType::ArrowFile)
}
b"application/arrow.stream" | b"application/vnd.apache.arrow.stream" => {
Ok(ContentType::ArrowStream)
}

View File

@ -0,0 +1,94 @@
use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use log::debug;
use crate::error::ColumnQError;
use crate::table::TableSource;
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
debug!("loading arrow table data...");
let mut schema_and_partitions = partitions_from_table_source!(t, |mut r| {
let arrow_file_reader = arrow::ipc::reader::FileReader::try_new(&mut r)?;
let schema = (*arrow_file_reader.schema()).clone();
arrow_file_reader
.into_iter()
.map(|batch| Ok(batch?))
.collect::<Result<Vec<RecordBatch>, ColumnQError>>()
.map(|batches| (Some(schema), batches))
})?;
let schema_ref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
debug!("inferring arrow stream schema...");
Arc::new(Schema::try_merge(
schema_and_partitions
.iter_mut()
.map(|v| if !(v.1).is_empty() { v.0.take() } else { None })
.flatten()
.collect::<Vec<_>>(),
)?)
}
};
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
schema_and_partitions
.into_iter()
.map(|v| v.1)
.collect::<Vec<Vec<RecordBatch>>>(),
)?)
}
#[cfg(test)]
mod tests {
use std::fs;
use datafusion::datasource::TableProvider;
use crate::table::TableLoadOption;
use crate::test_util::*;
use super::*;
#[tokio::test]
async fn load_partitions() -> anyhow::Result<()> {
let tmp_dir = tempdir::TempDir::new("columnq.test.arrows_partitions")?;
let tmp_dir_path = tmp_dir.path();
let source_path = test_data_path("uk_cities_with_headers.arrow");
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-01.arrow"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.arrow"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.arrow"))? > 0);
let t = to_mem_table(
&TableSource::new(
"uk_cities".to_string(),
tmp_dir_path.to_string_lossy().to_string(),
)
.with_option(TableLoadOption::arrow {}),
)
.await?;
assert_eq!(t.statistics().num_rows, Some(37 * 3));
Ok(())
}
#[tokio::test]
async fn load_file() -> anyhow::Result<()> {
let test_path = test_data_path("uk_cities_with_headers.arrow");
let t = to_mem_table(&TableSource::new("uk_cities".to_string(), test_path)).await?;
assert_eq!(t.statistics().num_rows, Some(37));
Ok(())
}
}

View File

@ -0,0 +1,94 @@
use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use log::debug;
use crate::error::ColumnQError;
use crate::table::TableSource;
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
debug!("loading arrow table data...");
let mut schema_and_partitions = partitions_from_table_source!(t, |mut r| {
let arrow_stream_reader = arrow::ipc::reader::StreamReader::try_new(&mut r)?;
let schema = (*arrow_stream_reader.schema()).clone();
arrow_stream_reader
.into_iter()
.map(|batch| Ok(batch?))
.collect::<Result<Vec<RecordBatch>, ColumnQError>>()
.map(|batches| (Some(schema), batches))
})?;
let schema_ref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
debug!("inferring arrow stream schema...");
Arc::new(Schema::try_merge(
schema_and_partitions
.iter_mut()
.map(|v| if !(v.1).is_empty() { v.0.take() } else { None })
.flatten()
.collect::<Vec<_>>(),
)?)
}
};
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
schema_and_partitions
.into_iter()
.map(|v| v.1)
.collect::<Vec<Vec<RecordBatch>>>(),
)?)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use datafusion::datasource::TableProvider;
use crate::table::TableLoadOption;
use crate::test_util::*;
#[tokio::test]
async fn load_partitions() -> anyhow::Result<()> {
let tmp_dir = tempdir::TempDir::new("columnq.test.arrows_partitions")?;
let tmp_dir_path = tmp_dir.path();
let source_path = test_data_path("uk_cities_with_headers.arrows");
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-01.arrows"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.arrows"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.arrows"))? > 0);
let t = to_mem_table(
&TableSource::new(
"uk_cities".to_string(),
tmp_dir_path.to_string_lossy().to_string(),
)
.with_option(TableLoadOption::arrows {}),
)
.await?;
assert_eq!(t.statistics().num_rows, Some(37 * 3));
Ok(())
}
#[tokio::test]
async fn load_file() -> anyhow::Result<()> {
let test_path = test_data_path("uk_cities_with_headers.arrows");
let t = to_mem_table(&TableSource::new("uk_cities".to_string(), test_path)).await?;
assert_eq!(t.statistics().num_rows, Some(37));
Ok(())
}
}

View File

@ -13,6 +13,8 @@ use uriparse::URIReference;
use crate::error::ColumnQError;
pub mod arrow_ipc_file;
pub mod arrow_ipc_stream;
pub mod csv;
pub mod delta;
pub mod google_spreadsheets;
@ -164,6 +166,8 @@ pub enum TableLoadOption {
parquet(TableOptionParquet),
google_spreadsheet(TableOptionGoogleSpreasheet),
delta {},
arrow {},
arrows {},
}
impl TableLoadOption {
@ -197,6 +201,8 @@ impl TableLoadOption {
Self::csv { .. } => "csv",
Self::parquet { .. } => "parquet",
Self::google_spreadsheet(_) | Self::delta { .. } => "",
Self::arrow { .. } => "arrow",
Self::arrows { .. } => "arrows",
}
}
}
@ -318,7 +324,7 @@ impl TableSource {
})?;
match ext {
"csv" | "json" | "parquet" | "ndjson" => ext,
"csv" | "json" | "parquet" | "ndjson" | "arrow" | "arrows" => ext,
_ => {
return Err(ColumnQError::InvalidUri(format!(
"unsupported extension in uri: {}",
@ -347,6 +353,8 @@ pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQErro
Arc::new(google_spreadsheets::to_mem_table(t).await?)
}
TableLoadOption::delta { .. } => Arc::new(delta::to_mem_table(t).await?),
TableLoadOption::arrow { .. } => Arc::new(arrow_ipc_file::to_mem_table(t).await?),
TableLoadOption::arrows { .. } => Arc::new(arrow_ipc_stream::to_mem_table(t).await?),
})
} else {
let t: Arc<dyn TableProvider> = match t.extension()? {
@ -354,6 +362,8 @@ pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQErro
"json" => Arc::new(json::to_mem_table(t).await?),
"ndjson" => Arc::new(ndjson::to_mem_table(t).await?),
"parquet" => parquet::to_datafusion_table(t).await?,
"arrow" => Arc::new(arrow_ipc_file::to_mem_table(t).await?),
"arrows" => Arc::new(arrow_ipc_stream::to_mem_table(t).await?),
ext => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",

View File

@ -63,6 +63,11 @@ pub fn encode_record_batches(
.map_err(ApiErrResp::csv_serialization)?;
Ok(builder.body(payload))
}
encoding::ContentType::ArrowFile => {
let payload = encoding::arrow::record_batches_to_file_bytes(batches)
.map_err(ApiErrResp::arrow_file_serialization)?;
Ok(builder.body(payload))
}
encoding::ContentType::ArrowStream => {
let payload = encoding::arrow::record_batches_to_stream_bytes(batches)
.map_err(ApiErrResp::arrow_stream_serialization)?;

View File

@ -40,6 +40,14 @@ impl ApiErrResp {
}
}
pub fn arrow_file_serialization(_: arrow::error::ArrowError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,
error: "arrow_file_serialization".to_string(),
message: "Failed to serialize record batches into arrow file".to_string(),
}
}
pub fn arrow_stream_serialization(_: arrow::error::ArrowError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,

Binary file not shown.

Binary file not shown.