From 2c39fb13e7e1ade927faa2a2226a566e10fd6737 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Sun, 21 Feb 2021 01:09:39 -0800 Subject: [PATCH] support loading multiple partitions (#6) --- Cargo.lock | 60 +++++++++++- Cargo.toml | 6 +- README.md | 1 - columnq/Cargo.toml | 1 + columnq/src/io/mod.rs | 43 ++++++--- columnq/src/lib.rs | 16 ++-- columnq/src/table/csv.rs | 73 ++++++++++---- columnq/src/table/json.rs | 179 ++++++++++++++++++++--------------- columnq/src/table/mod.rs | 38 +++++++- columnq/src/table/parquet.rs | 51 +++++++--- 10 files changed, 336 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46a99cd..6a47a65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,7 +293,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb" +source = "git+https://github.com/houqp/arrow.git?rev=555d12458a1c32301e0ce179143577f34fe78b6e#555d12458a1c32301e0ce179143577f34fe78b6e" dependencies = [ "cfg_aliases", "chrono", @@ -601,6 +601,7 @@ dependencies = [ "serde_json", "serde_yaml", "sqlparser 0.7.0", + "tempdir", "thiserror", "tokio", "uriparse", @@ -778,7 +779,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb" +source = "git+https://github.com/houqp/arrow.git?rev=555d12458a1c32301e0ce179143577f34fe78b6e#555d12458a1c32301e0ce179143577f34fe78b6e" dependencies = [ "ahash 0.7.0", "arrow", @@ -963,6 +964,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "futures" version = "0.3.12" @@ -1711,7 +1718,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=e5b567d3582ba9fa91b29ab1c944de97b491bacb#e5b567d3582ba9fa91b29ab1c944de97b491bacb" +source = "git+https://github.com/houqp/arrow.git?rev=555d12458a1c32301e0ce179143577f34fe78b6e#555d12458a1c32301e0ce179143577f34fe78b6e" dependencies = [ "arrow", "base64 0.12.3", @@ -1880,6 +1887,19 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.7.3" @@ -1925,6 +1945,21 @@ dependencies = [ "rand_core 0.6.2", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -1961,6 +1996,15 @@ dependencies = [ "rand_core 0.6.2", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -2485,6 +2529,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.2.0" diff --git a/Cargo.toml b/Cargo.toml index 2099319..14e4ee1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,9 @@ members = [ ] [patch.crates-io] -arrow = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" } -parquet = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" } -datafusion = { git = "https://github.com/houqp/arrow.git", rev = "e5b567d3582ba9fa91b29ab1c944de97b491bacb" } +arrow = { git = "https://github.com/houqp/arrow.git", rev = "555d12458a1c32301e0ce179143577f34fe78b6e" } +parquet = { git = "https://github.com/houqp/arrow.git", rev = "555d12458a1c32301e0ce179143577f34fe78b6e" } +datafusion = { git = "https://github.com/houqp/arrow.git", rev = "555d12458a1c32301e0ce179143577f34fe78b6e" } actix-web = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" } actix-http = { git = "https://github.com/actix/actix-web.git", rev = "b37669cb3b47eeb60c161ff12156d1c877af91a2" } diff --git a/README.md b/README.md index e2bbc06..3a25258 100644 --- a/README.md +++ b/README.md @@ -242,7 +242,6 @@ Data layer: - [x] JSON - [x] parquet - [ ] xls, xlsx, xlsm, ods: https://github.com/tafia/calamine - - [ ] load multiple partitions from a folder Misc: - [ ] auto gen OpenApi doc for rest layer diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 1ec6740..99331e6 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -32,6 +32,7 @@ tokio = "1" [dev-dependencies] anyhow = "1" serde_yaml = "0.8" +tempdir = "0" [features] simd = ["arrow/simd", "datafusion/simd"] diff --git a/columnq/src/io/mod.rs b/columnq/src/io/mod.rs index 1d8a748..c2ca484 100644 --- a/columnq/src/io/mod.rs +++ b/columnq/src/io/mod.rs @@ -1,29 +1,48 @@ use std::fs; +use datafusion::physical_plan::common::build_file_list; use uriparse::URIReference; use crate::error::ColumnQError; +use crate::table::TableSource; -pub fn partitions_from_fs_uri( - uri: &URIReference, +pub fn partitions_from_fs_uri<'a, F, T>( + t: &'a TableSource, + uri: URIReference<'a>, mut partition_reader: F, -) -> Result +) -> Result, ColumnQError> where F: FnMut(std::fs::File) -> Result, { - let reader = fs::File::open(uri.path().to_string()) - .map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?; - partition_reader(reader) + let fs_path = uri.path().to_string(); + let mut files = vec![]; + let mut file_ext = ".".to_string(); + file_ext.push_str(t.extension()?); + build_file_list(&fs_path, &mut files, &file_ext)?; + + // TODO: load partitions in parallel + let partitions = files + .iter() + .map(|fpath| { + let reader = fs::File::open(fpath) + .map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?; + + Ok(partition_reader(reader)?) + }) + .collect::, ColumnQError>>()?; + + Ok(partitions) } -pub async fn partitions_from_http_uri<'a, 'b, F, T>( - uri: &'a URIReference<'b>, +pub async fn partitions_from_http_uri<'a, F, T>( + t: &'a TableSource, + _uri: URIReference<'a>, mut partition_reader: F, -) -> Result +) -> Result, ColumnQError> where F: FnMut(std::io::Cursor) -> Result, { - let resp = reqwest::get(&uri.to_string()) + let resp = reqwest::get(&t.uri) .await .map_err(|e| ColumnQError::HttpStore(e.to_string()))?; if resp.status().as_u16() / 100 != 2 { @@ -35,5 +54,7 @@ where let reader = std::io::Cursor::new(resp.bytes().await.map_err(|e| { ColumnQError::HttpStore(format!("Failed to decode server response: {}", e)) })?); - partition_reader(reader) + + // HTTP store doesn't support directory listing, so we always only return a single partition + Ok(vec![partition_reader(reader)?]) } diff --git a/columnq/src/lib.rs b/columnq/src/lib.rs index bfe833a..cc7e052 100644 --- a/columnq/src/lib.rs +++ b/columnq/src/lib.rs @@ -3,23 +3,25 @@ extern crate lazy_static; pub mod error; -macro_rules! with_reader_from_uri { - ($call_with_r:expr, $uri:ident) => { - match $uri.scheme() { +macro_rules! partitions_from_table_source { + ($table_source:ident, $call_with_r:expr) => {{ + let uri = $table_source.parsed_uri()?; + + match uri.scheme() { // default to local file when schema is not provided None | Some(uriparse::Scheme::FileSystem) => { - crate::io::partitions_from_fs_uri(&$uri, $call_with_r) + crate::io::partitions_from_fs_uri(&$table_source, uri, $call_with_r) } Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => { - crate::io::partitions_from_http_uri(&$uri, $call_with_r).await + crate::io::partitions_from_http_uri(&$table_source, uri, $call_with_r).await } // "s3" => {} _ => Err(ColumnQError::InvalidUri(format!( "Unsupported scheme in table uri: {:?}", - $uri + $table_source.uri, ))), } - }; + }}; } pub mod columnq; diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index 23784b6..80c4347 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use crate::error::ColumnQError; @@ -14,26 +15,30 @@ pub async fn to_mem_table( let batch_size = 1024; let projection = None; - let uri = t.parsed_uri()?; - let schema_ref: arrow::datatypes::SchemaRef = match &t.schema { Some(s) => Arc::new(s.into()), None => { - let schema = with_reader_from_uri!( - |mut r| { - Ok(arrow::csv::reader::infer_schema_from_reader( - &mut r, delimiter, None, has_header, - )? - .0) - }, - uri - )?; - Arc::new(schema) + let schemas = partitions_from_table_source!(t, |mut r| { + let (schema, record_count) = arrow::csv::reader::infer_schema_from_reader( + &mut r, delimiter, None, has_header, + )?; + + if record_count > 0 { + Ok(Some(schema)) + } else { + Ok(None) + } + })? + .into_iter() + .filter_map(|e| e) + .collect::>(); + + Arc::new(Schema::try_merge(schemas)?) } }; - let batches: Vec = with_reader_from_uri!( - |r| -> Result, ColumnQError> { + let partitions: Vec> = + partitions_from_table_source!(t, |r| -> Result, ColumnQError> { let csv_reader = arrow::csv::Reader::new( r, schema_ref.clone(), @@ -48,12 +53,44 @@ pub async fn to_mem_table( .into_iter() .map(|batch| Ok(batch?)) .collect::, ColumnQError>>() - }, - uri - )?; + })?; - let partitions = vec![batches]; Ok(datafusion::datasource::MemTable::try_new( schema_ref, partitions, )?) } + +#[cfg(test)] +mod tests { + use super::*; + + use std::fs; + + use datafusion::datasource::TableProvider; + + use crate::table::TableLoadOption; + use crate::test_util::*; + + #[tokio::test] + async fn load_partitions() -> anyhow::Result<()> { + let tmp_dir = tempdir::TempDir::new("columnq.test.csv_partitions")?; + let tmp_dir_path = tmp_dir.path(); + + let source_path = test_data_path("uk_cities_with_headers.csv"); + assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-01.csv"))? > 0); + assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.csv"))? > 0); + assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.csv"))? > 0); + + let t = to_mem_table(&TableSource { + name: "uk_cities".to_string(), + uri: tmp_dir_path.to_string_lossy().to_string(), + schema: None, + option: Some(TableLoadOption::csv {}), + }) + .await?; + + assert_eq!(t.statistics().num_rows, Some(37 * 3)); + + Ok(()) + } +} diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index ceeae4d..1a9404f 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -1,23 +1,22 @@ use std::io::Read; use std::sync::Arc; +use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; use serde_json::value::Value; -use uriparse::URIReference; use crate::error::ColumnQError; -use crate::table::{TableLoadOption, TableSource}; +use crate::table::{TableLoadOption, TableSchema, TableSource}; fn json_value_from_reader(r: R) -> Result { serde_json::from_reader(r).map_err(ColumnQError::json_parse) } -async fn load_array_by_path<'a>( - uri: URIReference<'a>, +fn json_partition_to_vec<'a>( + json_partition: &Value, pointer: Option<&'a str>, ) -> Result, ColumnQError> { - let payload: Value = with_reader_from_uri!(json_value_from_reader, uri)?; - - let mut value_ref: &Value = &payload; + let mut value_ref = json_partition; if let Some(p) = pointer { match value_ref.pointer(p) { @@ -40,6 +39,67 @@ async fn load_array_by_path<'a>( } } +fn json_vec_to_partition( + json_rows: Vec, + provided_schema: &Option, + batch_size: usize, + array_encoded: bool, +) -> Result<(arrow::datatypes::Schema, Vec), ColumnQError> { + // load schema + let schema = match provided_schema { + Some(s) => s.into(), + None => arrow::json::reader::infer_json_schema_from_iterator( + json_rows.iter().map(|v| Ok(v.clone())), + ) + .map_err(|e| { + ColumnQError::LoadJson(format!("Failed to infer schema from JSON data: {}", e)) + })?, + }; + + // decode to arrow record batch + let decoder = arrow::json::reader::Decoder::new(Arc::new(schema.clone()), batch_size, None); + let batch = { + // enclose values_iter in its own scope so it won't brrow schema_ref til end of this + // function + let mut values_iter: Box>>; + values_iter = if array_encoded { + // convert row array to object based on schema + // TODO: support array_encoded read in upstream arrow json reader instead + Box::new(json_rows.into_iter().map(|json_row| { + let mut m = serde_json::map::Map::new(); + schema.fields().iter().enumerate().try_for_each(|(i, f)| { + match json_row.get(i) { + Some(x) => { + m.insert(f.name().to_string(), x.clone()); + Ok(()) + } + None => Err(arrow::error::ArrowError::JsonError(format!( + "arry encoded JSON row missing column {:?} : {:?}", + i, json_row + ))), + } + })?; + Ok(Value::Object(m)) + })) + } else { + // no need to convert row since each row is already an object + Box::new(json_rows.into_iter().map(Ok)) + }; + + // decode whole array into single record batch + decoder + .next_batch(&mut values_iter) + .map_err(|e| { + ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e)) + })? + .ok_or_else(|| { + ColumnQError::LoadJson("JSON data results in empty arrow record batch".to_string()) + })? + }; + + Ok((schema, vec![batch])) +} + pub async fn to_mem_table( t: &TableSource, ) -> Result { @@ -61,82 +121,47 @@ pub async fn to_mem_table( _ => None, }; - // load array from file - let json_rows = load_array_by_path(t.parsed_uri()?, pointer.as_deref()).await?; + let mut merged_schema: Option = None; + let json_partitions: Vec = partitions_from_table_source!(t, json_value_from_reader)?; - if json_rows.is_empty() { - match pointer { - Some(p) => { - return Err(ColumnQError::LoadJson(format!( - "{} points to an emtpy array", - p - ))); + let partitions = json_partitions + .iter() + .map(|json_partition| { + let json_rows = json_partition_to_vec(json_partition, pointer.as_deref())?; + if json_rows.is_empty() { + match &pointer { + Some(p) => { + return Err(ColumnQError::LoadJson(format!( + "{} points to an emtpy array", + p + ))); + } + None => { + return Err(ColumnQError::LoadJson( + "JSON data is an emtpy array".to_string(), + )); + } + } } - None => { - return Err(ColumnQError::LoadJson( - "JSON data is an emtpy array".to_string(), - )); - } - } - } - // load schema - let schema_ref: arrow::datatypes::SchemaRef = match &t.schema { - Some(s) => Arc::new(s.into()), - None => arrow::json::reader::infer_json_schema_from_iterator( - json_rows.iter().map(|v| Ok(v.clone())), - ) - .map_err(|e| { - ColumnQError::LoadJson(format!("Failed to infer schema from JSON data: {}", e)) - })?, - }; + let (batch_schema, partition) = + json_vec_to_partition(json_rows, &t.schema, batch_size, array_encoded)?; - // decode to arrow record batch - let decoder = arrow::json::reader::Decoder::new(schema_ref.clone(), batch_size, None); - let batch = { - // enclose values_iter in its own scope so it won't brrow schema_ref til end of this - // function - let mut values_iter: Box>>; - values_iter = if array_encoded { - // convert row array to object based on schema - // TODO: support array_encoded read in arrow json reader instead - Box::new(json_rows.into_iter().map(|json_row| { - let mut m = serde_json::map::Map::new(); - schema_ref - .fields() - .iter() - .enumerate() - .try_for_each(|(i, f)| match json_row.get(i) { - Some(x) => { - m.insert(f.name().to_string(), x.clone()); - Ok(()) - } - None => Err(arrow::error::ArrowError::JsonError(format!( - "arry encoded JSON row missing column {:?} : {:?}", - i, json_row - ))), - })?; - Ok(Value::Object(m)) - })) - } else { - // no need to convert row since each row is already an object - Box::new(json_rows.into_iter().map(Ok)) - }; + merged_schema = Some(match &merged_schema { + Some(s) if s != &batch_schema => Schema::try_merge(vec![s.clone(), batch_schema])?, + _ => batch_schema, + }); - // decode whole array into single record batch - decoder - .next_batch(&mut values_iter) - .map_err(|e| { - ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e)) - })? - .ok_or_else(|| { - ColumnQError::LoadJson("JSON data results in empty arrow record batch".to_string()) - })? - }; - let partitions = vec![vec![batch]]; + Ok(partition) + }) + .collect::>, ColumnQError>>()?; Ok(datafusion::datasource::MemTable::try_new( - schema_ref, partitions, + Arc::new( + merged_schema + .ok_or_else(|| ColumnQError::LoadJson("failed to load schema".to_string()))?, + ), + partitions, )?) } @@ -194,7 +219,7 @@ mod tests { ]; expected_obj_keys.sort(); - assert_eq!(obj_keys, expected_obj_keys,); + assert_eq!(obj_keys, expected_obj_keys); Ok(()) } diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index a484326..47b4830 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -69,6 +69,15 @@ impl TableLoadOption { )), } } + + pub fn extension<'a>(&'a self) -> &'static str { + match self { + TableLoadOption::json { .. } => "json", + TableLoadOption::csv { .. } => "csv", + TableLoadOption::parquet { .. } => "parquet", + TableLoadOption::google_spreadsheet(_) => "gsheet", + } + } } #[derive(Deserialize, Clone)] @@ -82,6 +91,7 @@ pub struct TableSource { impl TableSource { pub fn new(name: String, uri: String) -> Self { + // TODO: parse table format from uri during initializeion? Self { name, uri, @@ -94,10 +104,36 @@ impl TableSource { URIReference::try_from(self.uri.as_str()) .map_err(|_| ColumnQError::InvalidUri(self.uri.clone())) } + + pub fn extension<'a>(&'a self) -> Result<&'a str, ColumnQError> { + Ok(match &self.option { + Some(opt) => opt.extension(), + None => { + let ext = Path::new(&self.uri) + .extension() + .and_then(OsStr::to_str) + .ok_or_else(|| { + ColumnQError::InvalidUri(format!( + "cannot detect table extension from uri: {}", + self.uri + )) + })?; + + match ext { + "csv" | "json" | "parquet" => ext, + _ => { + return Err(ColumnQError::InvalidUri(format!( + "unsupported extension in uri: {}", + self.uri + ))); + } + } + } + }) + } } pub async fn load(t: &TableSource) -> Result { - // TODO: support reading list of files within directory if let Some(opt) = &t.option { return Ok(match opt { TableLoadOption::json { .. } => json::to_mem_table(t).await?, diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index 4b94310..1804d2f 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -18,10 +18,8 @@ pub async fn to_mem_table( let mut schema: Option = None; - let uri = t.parsed_uri()?; - - let partition: Vec = with_reader_from_uri!( - |mut r| -> Result, ColumnQError> { + let partitions: Vec> = + partitions_from_table_source!(t, |mut r| -> Result, ColumnQError> { // TODO: this is very inefficient, we are copying the parquet data in memory twice when // it's being fetched from http store let mut buffer = Vec::new(); @@ -41,35 +39,35 @@ pub async fn to_mem_table( ColumnQError::LoadParquet("failed to load schema from partition".to_string()) })?; schema = Some(match &schema { - Some(s) => Schema::try_merge(vec![s.clone(), batch_schema])?, - None => batch_schema, + Some(s) if s != &batch_schema => Schema::try_merge(vec![s.clone(), batch_schema])?, + _ => batch_schema, }); Ok(record_batch_reader .into_iter() .collect::>>()?) - }, - uri - )?; + })?; Ok(datafusion::datasource::MemTable::try_new( Arc::new( schema.ok_or_else(|| ColumnQError::LoadParquet("failed to load schema".to_string()))?, ), - vec![partition], + partitions, )?) } #[cfg(test)] mod tests { use super::*; + use std::fs; use datafusion::datasource::TableProvider; + use crate::table::TableLoadOption; use crate::test_util::*; #[tokio::test] - async fn simple_parquet_load() -> Result<(), ColumnQError> { + async fn load_simple_parquet() -> Result<(), ColumnQError> { let t = to_mem_table(&TableSource { name: "blogs".to_string(), uri: test_data_path("blogs.parquet"), @@ -91,4 +89,35 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn load_partitions() -> anyhow::Result<()> { + let tmp_dir = tempdir::TempDir::new("columnq.test.parquet_partitions")?; + let tmp_dir_path = tmp_dir.path(); + + let source_path = test_data_path("blogs.parquet"); + assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-01.parquet"))? > 0); + assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.parquet"))? > 0); + assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.parquet"))? > 0); + + let t = to_mem_table(&TableSource { + name: "blogs".to_string(), + uri: tmp_dir_path.to_string_lossy().to_string(), + schema: None, + option: Some(TableLoadOption::parquet {}), + }) + .await?; + + assert_eq!( + t.schema() + .metadata() + .get("writer.model.name") + .map(|s| s.as_str()), + Some("protobuf") + ); + + assert_eq!(t.statistics().num_rows, Some(1500)); + + Ok(()) + } }