From 2f4646bbb40aacc7dc420448a8bbf4dba83fd05a Mon Sep 17 00:00:00 2001 From: QP Hou Date: Sat, 20 Feb 2021 14:42:22 -0800 Subject: [PATCH] feat: support http store for parquet data source (#5) --- Cargo.lock | 6 +-- Cargo.toml | 6 +-- columnq/src/io/mod.rs | 8 ++-- columnq/src/lib.rs | 4 +- columnq/src/table/csv.rs | 5 +- columnq/src/table/json.rs | 17 +++---- columnq/src/table/mod.rs | 11 ++++- columnq/src/table/parquet.rs | 91 ++++++++++++++++++++++++++++++------ 8 files changed, 106 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10ea9ea..46a99cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,7 +293,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44" +source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb" dependencies = [ "cfg_aliases", "chrono", @@ -778,7 +778,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44" +source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb" dependencies = [ "ahash 0.7.0", "arrow", @@ -1711,7 +1711,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=254318d1d3fca67dd60483e121299b52cf7fab44#254318d1d3fca67dd60483e121299b52cf7fab44" +source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb" dependencies = [ "arrow", "base64 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index acc2029..2099319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,9 @@ members = [ ] [patch.crates-io] -arrow = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" } -parquet = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" } -datafusion = { git = "https://github.com/houqp/arrow.git", rev = "254318d1d3fca67dd60483e121299b52cf7fab44" } +arrow = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" } +parquet = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" } +datafusion = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" } actix-web = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" } actix-http = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" } diff --git a/columnq/src/io/mod.rs b/columnq/src/io/mod.rs index 46784a5..1d8a748 100644 --- a/columnq/src/io/mod.rs +++ b/columnq/src/io/mod.rs @@ -6,10 +6,10 @@ use crate::error::ColumnQError; pub fn partitions_from_fs_uri( uri: &URIReference, - partition_reader: F, + mut partition_reader: F, ) -> Result where - F: Fn(std::fs::File) -> Result, + F: FnMut(std::fs::File) -> Result, { let reader = fs::File::open(uri.path().to_string()) .map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?; @@ -18,10 +18,10 @@ where pub async fn partitions_from_http_uri<'a, 'b, F, T>( uri: &'a URIReference<'b>, - partition_reader: F, + mut partition_reader: F, ) -> Result where - F: Fn(std::io::Cursor) -> Result, + F: FnMut(std::io::Cursor) -> Result, { let resp = reqwest::get(&uri.to_string()) .await diff --git a/columnq/src/lib.rs b/columnq/src/lib.rs index 66745e9..bfe833a 100644 --- a/columnq/src/lib.rs +++ b/columnq/src/lib.rs @@ -7,10 +7,10 @@ macro_rules! with_reader_from_uri { ($call_with_r:expr, $uri:ident) => { match $uri.scheme() { // default to local file when schema is not provided - None | Some(Scheme::FileSystem) => { + None | Some(uriparse::Scheme::FileSystem) => { crate::io::partitions_from_fs_uri(&$uri, $call_with_r) } - Some(Scheme::HTTP) | Some(Scheme::HTTPS) => { + Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => { crate::io::partitions_from_http_uri(&$uri, $call_with_r).await } // "s3" => {} diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index da82af8..23784b6 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -1,8 +1,6 @@ -use std::convert::TryFrom; use std::sync::Arc; use arrow::record_batch::RecordBatch; -use uriparse::{Scheme, URIReference}; use crate::error::ColumnQError; use crate::table::TableSource; @@ -16,8 +14,7 @@ pub async fn to_mem_table( let batch_size = 1024; let projection = None; - let uri = URIReference::try_from(t.uri.as_str()) - .map_err(|_| ColumnQError::InvalidUri(t.uri.clone()))?; + let uri = t.parsed_uri()?; let schema_ref: arrow::datatypes::SchemaRef = match &t.schema { Some(s) => Arc::new(s.into()), diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index 8c5e735..ceeae4d 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -1,9 +1,8 @@ -use std::convert::TryFrom; use std::io::Read; use std::sync::Arc; use serde_json::value::Value; -use uriparse::{Scheme, URIReference}; +use uriparse::URIReference; use crate::error::ColumnQError; use crate::table::{TableLoadOption, TableSource}; @@ -12,13 +11,10 @@ fn json_value_from_reader(r: R) -> Result { serde_json::from_reader(r).map_err(ColumnQError::json_parse) } -async fn load_array_by_path( - uri_s: &str, - pointer: Option<&str>, +async fn load_array_by_path<'a>( + uri: URIReference<'a>, + pointer: Option<&'a str>, ) -> Result, ColumnQError> { - let uri = - URIReference::try_from(uri_s).map_err(|_| ColumnQError::InvalidUri(uri_s.to_string()))?; - let payload: Value = with_reader_from_uri!(json_value_from_reader, uri)?; let mut value_ref: &Value = &payload; @@ -47,6 +43,7 @@ async fn load_array_by_path( pub async fn to_mem_table( t: &TableSource, ) -> Result { + // TODO: make batch size configurable let batch_size = 1024; let array_encoded = match &t.option { Some(TableLoadOption::json { array_encoded, .. }) => array_encoded.unwrap_or(false), @@ -65,7 +62,7 @@ pub async fn to_mem_table( }; // load array from file - let json_rows = load_array_by_path(&t.uri, pointer.as_deref()).await?; + let json_rows = load_array_by_path(t.parsed_uri()?, pointer.as_deref()).await?; if json_rows.is_empty() { match pointer { @@ -152,7 +149,7 @@ mod tests { use crate::test_util::*; #[tokio::test] - async fn test_nested_struct_and_lists() -> Result<(), ColumnQError> { + async fn nested_struct_and_lists() -> Result<(), ColumnQError> { let t = to_mem_table(&TableSource { name: "spacex_launches".to_string(), uri: test_data_path("spacex-launches.json"), diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index d1213ed..a484326 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -1,7 +1,9 @@ +use std::convert::TryFrom; use std::ffi::OsStr; use std::path::Path; use serde_derive::Deserialize; +use uriparse::URIReference; use crate::error::ColumnQError; @@ -87,6 +89,11 @@ impl TableSource { option: None, } } + + pub fn parsed_uri(&self) -> Result { + URIReference::try_from(self.uri.as_str()) + .map_err(|_| ColumnQError::InvalidUri(self.uri.clone())) + } } pub async fn load(t: &TableSource) -> Result { @@ -95,7 +102,7 @@ pub async fn load(t: &TableSource) -> Result json::to_mem_table(t).await?, TableLoadOption::csv { .. } => csv::to_mem_table(t).await?, - TableLoadOption::parquet { .. } => parquet::to_mem_table(t)?, + TableLoadOption::parquet { .. } => parquet::to_mem_table(t).await?, TableLoadOption::google_spreadsheet(_) => google_spreadsheets::to_mem_table(t).await?, }); } @@ -105,7 +112,7 @@ pub async fn load(t: &TableSource) -> Result csv::to_mem_table(t).await?, Some("json") => json::to_mem_table(t).await?, - Some("parquet") => parquet::to_mem_table(t)?, + Some("parquet") => parquet::to_mem_table(t).await?, Some(ext) => { return Err(ColumnQError::InvalidUri(format!( "failed to register `{}` as table `{}`, unsupported table format `{}`", diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index b8ed413..4b94310 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -1,31 +1,94 @@ -use std::fs::File; +use std::io::Read; use std::sync::Arc; -use arrow::record_batch::RecordBatchReader; +use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use parquet::file::reader::SerializedFileReader; +use parquet::file::serialized_reader::SliceableCursor; use crate::error::ColumnQError; use crate::table::TableSource; -pub fn to_mem_table(t: &TableSource) -> Result { +pub async fn to_mem_table( + t: &TableSource, +) -> Result { + // TODO: make batch size configurable let batch_size = 1024; - let file = File::open(&t.uri).map_err(ColumnQError::open_parquet_file)?; - let file_reader = SerializedFileReader::new(file).map_err(ColumnQError::parquet_file_reader)?; - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + let mut schema: Option = None; - let record_batch_reader = arrow_reader - .get_record_reader(batch_size) - .map_err(ColumnQError::parquet_record_reader)?; - let schema_ref = record_batch_reader.schema(); + let uri = t.parsed_uri()?; - let partition = record_batch_reader - .into_iter() - .collect::>>()?; + let partition: Vec = with_reader_from_uri!( + |mut r| -> Result, ColumnQError> { + // TODO: this is very inefficient, we are copying the parquet data in memory twice when + // it's being fetched from http store + let mut buffer = Vec::new(); + r.read_to_end(&mut buffer).map_err(|_| { + ColumnQError::LoadParquet("failed to copy parquet data in memory".to_string()) + })?; + + let file_reader = SerializedFileReader::new(SliceableCursor::new(buffer)) + .map_err(ColumnQError::parquet_file_reader)?; + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + + let record_batch_reader = arrow_reader + .get_record_reader(batch_size) + .map_err(ColumnQError::parquet_record_reader)?; + + let batch_schema = arrow_reader.get_schema().map_err(|_| { + ColumnQError::LoadParquet("failed to load schema from partition".to_string()) + })?; + schema = Some(match &schema { + Some(s) => Schema::try_merge(vec![s.clone(), batch_schema])?, + None => batch_schema, + }); + + Ok(record_batch_reader + .into_iter() + .collect::>>()?) + }, + uri + )?; Ok(datafusion::datasource::MemTable::try_new( - schema_ref, + Arc::new( + schema.ok_or_else(|| ColumnQError::LoadParquet("failed to load schema".to_string()))?, + ), vec![partition], )?) } + +#[cfg(test)] +mod tests { + use super::*; + + use datafusion::datasource::TableProvider; + + use crate::test_util::*; + + #[tokio::test] + async fn simple_parquet_load() -> Result<(), ColumnQError> { + let t = to_mem_table(&TableSource { + name: "blogs".to_string(), + uri: test_data_path("blogs.parquet"), + schema: None, + option: None, + }) + .await?; + + let schema = t.schema(); + assert_eq!( + schema + .metadata() + .get("writer.model.name") + .map(|s| s.as_str()), + Some("protobuf") + ); + + assert_eq!(t.statistics().num_rows, Some(500)); + + Ok(()) + } +}