From 528f89efc910dd5583c0f892518a90c0bb16d2d1 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Thu, 27 Jan 2022 20:14:11 -0800 Subject: [PATCH] fix json table partition loading (#133) --- columnq-cli/Cargo.toml | 2 +- columnq/Cargo.toml | 2 +- columnq/src/table/json.rs | 46 +++++++++++++++++++++++++++------------ roapi-http/Cargo.toml | 2 +- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/columnq-cli/Cargo.toml b/columnq-cli/Cargo.toml index f756522..a9601d4 100644 --- a/columnq-cli/Cargo.toml +++ b/columnq-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "columnq-cli" -version = "0.2.1" +version = "0.2.2" homepage = "https://github.com/roapi/roapi" license = "MIT" readme = "README.md" diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 73c1c31..d437afe 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "columnq" -version = "0.4.3" +version = "0.4.4" homepage = "https://github.com/roapi/roapi" license = "MIT" authors = ["QP Hou "] diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index 9efb4d4..6ab46d6 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -60,7 +60,8 @@ fn json_vec_to_partition( // decode to arrow record batch let decoder = arrow::json::reader::Decoder::new(Arc::new(schema.clone()), batch_size, None); - let batch = { + let mut batches = vec![]; + { // enclose values_iter in its own scope so it won't brrow schema_ref til end of this // function let mut values_iter: Box>>; @@ -88,23 +89,19 @@ fn json_vec_to_partition( Box::new(json_rows.into_iter().map(Ok)) }; - // decode whole array into single record batch - decoder - .next_batch(&mut values_iter) - .map_err(|e| { - ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e)) - })? - .ok_or_else(|| { - ColumnQError::LoadJson("JSON data results in empty arrow record batch".to_string()) - })? - }; + while let Some(batch) = decoder.next_batch(&mut values_iter).map_err(|e| { + ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e)) + })? { + batches.push(batch); + } + } - Ok((schema, vec![batch])) + Ok((schema, batches)) } -pub async fn to_mem_table( +async fn to_partitions( t: &TableSource, -) -> Result { +) -> Result<(Option, Vec>), ColumnQError> { let batch_size = t.batch_size; let array_encoded = match &t.option { Some(TableLoadOption::json { array_encoded, .. }) => array_encoded.unwrap_or(false), @@ -157,6 +154,13 @@ pub async fn to_mem_table( }) .collect::>, ColumnQError>>()?; + Ok((merged_schema, partitions)) +} + +pub async fn to_mem_table( + t: &TableSource, +) -> Result { + let (merged_schema, partitions) = to_partitions(t).await?; Ok(datafusion::datasource::MemTable::try_new( Arc::new( merged_schema @@ -222,4 +226,18 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_multiple_batches() -> Result<(), ColumnQError> { + let mut source = TableSource::new( + "spacex_launches".to_string(), + test_data_path("spacex_launches.json"), + ); + source.batch_size = 1; + let (_, p) = to_partitions(&source).await?; + assert_eq!(p.len(), 1); + assert_eq!(p[0][0].num_rows(), source.batch_size); + assert_eq!(p[0].len(), 132); + Ok(()) + } } diff --git a/roapi-http/Cargo.toml b/roapi-http/Cargo.toml index 9b3962e..d160b01 100644 --- a/roapi-http/Cargo.toml +++ b/roapi-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "roapi-http" -version = "0.5.2" +version = "0.5.3" authors = ["QP Hou "] homepage = "https://github.com/roapi/roapi" license = "MIT"