fix json table partition loading (#133)

This commit is contained in:
QP Hou 2022-01-27 20:14:11 -08:00 committed by GitHub
parent badaecdf17
commit 528f89efc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 17 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "columnq-cli"
version = "0.2.1"
version = "0.2.2"
homepage = "https://github.com/roapi/roapi"
license = "MIT"
readme = "README.md"

View File

@ -1,6 +1,6 @@
[package]
name = "columnq"
version = "0.4.3"
version = "0.4.4"
homepage = "https://github.com/roapi/roapi"
license = "MIT"
authors = ["QP Hou <dave2008713@gmail.com>"]

View File

@ -60,7 +60,8 @@ fn json_vec_to_partition(
// decode to arrow record batch
let decoder = arrow::json::reader::Decoder::new(Arc::new(schema.clone()), batch_size, None);
let batch = {
let mut batches = vec![];
{
// enclose values_iter in its own scope so it won't brrow schema_ref til end of this
// function
let mut values_iter: Box<dyn Iterator<Item = arrow::error::Result<Value>>>;
@ -88,23 +89,19 @@ fn json_vec_to_partition(
Box::new(json_rows.into_iter().map(Ok))
};
// decode whole array into single record batch
decoder
.next_batch(&mut values_iter)
.map_err(|e| {
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e))
})?
.ok_or_else(|| {
ColumnQError::LoadJson("JSON data results in empty arrow record batch".to_string())
})?
};
while let Some(batch) = decoder.next_batch(&mut values_iter).map_err(|e| {
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e))
})? {
batches.push(batch);
}
}
Ok((schema, vec![batch]))
Ok((schema, batches))
}
pub async fn to_mem_table(
async fn to_partitions(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
) -> Result<(Option<Schema>, Vec<Vec<RecordBatch>>), ColumnQError> {
let batch_size = t.batch_size;
let array_encoded = match &t.option {
Some(TableLoadOption::json { array_encoded, .. }) => array_encoded.unwrap_or(false),
@ -157,6 +154,13 @@ pub async fn to_mem_table(
})
.collect::<Result<Vec<Vec<RecordBatch>>, ColumnQError>>()?;
Ok((merged_schema, partitions))
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
let (merged_schema, partitions) = to_partitions(t).await?;
Ok(datafusion::datasource::MemTable::try_new(
Arc::new(
merged_schema
@ -222,4 +226,18 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_multiple_batches() -> Result<(), ColumnQError> {
let mut source = TableSource::new(
"spacex_launches".to_string(),
test_data_path("spacex_launches.json"),
);
source.batch_size = 1;
let (_, p) = to_partitions(&source).await?;
assert_eq!(p.len(), 1);
assert_eq!(p[0][0].num_rows(), source.batch_size);
assert_eq!(p[0].len(), 132);
Ok(())
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "roapi-http"
version = "0.5.2"
version = "0.5.3"
authors = ["QP Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/roapi/roapi"
license = "MIT"