Lazy load parquet (#63)

* PoC for parquet: reading a table by registering parquet directly.

* Adding config flag and restoring existing _in-memory_ code path.

* Addressing review comment: separate `to_mem_table()`.

* Addressing review comment: default-able `LoadOptionParquet`.

* Adding test: make sure we instantiated `datafusion::datasource::ParquetTable`
This commit is contained in:
Thomas Peiselt 2021-09-03 21:36:18 +02:00 committed by GitHub
parent c79a35a3fc
commit 5ace8b8695
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 47 deletions

View File

@ -1,9 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::arrow;
pub use datafusion::execution::context::ExecutionConfig;
use datafusion::{datasource::TableProvider, execution::context::ExecutionContext};
use datafusion::execution::context::ExecutionContext;
use crate::error::{ColumnQError, QueryError};
use crate::query;
@ -28,8 +27,7 @@ impl ColumnQ {
pub async fn load_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> {
let table = table::load(t).await?;
self.schema_map.insert(t.name.clone(), table.schema());
self.dfctx
.register_table(t.name.as_str(), Arc::new(table))?;
self.dfctx.register_table(t.name.as_str(), table)?;
Ok(())
}

View File

@ -2,6 +2,9 @@ use std::convert::TryFrom;
use std::ffi::OsStr;
use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use datafusion::datasource::TableProvider;
use datafusion::arrow;
use serde::de::{Deserialize, Deserializer};
@ -119,6 +122,27 @@ impl Default for TableOptionCsv {
}
}
#[derive(Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct TableOptionParquet {
#[serde(default = "TableOptionParquet::default_use_memory_table")]
use_memory_table: bool,
}
impl TableOptionParquet {
#[inline]
pub fn default_use_memory_table() -> bool {
true
}
}
impl Default for TableOptionParquet {
fn default() -> Self {
Self {
use_memory_table: Self::default_use_memory_table(),
}
}
}
// Adding new table format:
// * update TableLoadOption enum to add the new variant
// * update TableLoadOption.extension
@ -137,7 +161,7 @@ pub enum TableLoadOption {
},
csv(TableOptionCsv),
ndjson {},
parquet {},
parquet(TableOptionParquet),
google_spreadsheet(TableOptionGoogleSpreasheet),
delta {},
}
@ -159,6 +183,13 @@ impl TableLoadOption {
}
}
fn as_parquet(&self) -> Result<&TableOptionParquet, ColumnQError> {
match self {
Self::parquet(opt) => Ok(opt),
_ => Err(ColumnQError::ExpectFormatOption("parquet".to_string())),
}
}
pub fn extension<'a>(&'a self) -> &'static str {
match self {
Self::json { .. } => "json",
@ -305,33 +336,34 @@ impl TableSource {
}
}
pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, ColumnQError> {
pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
if let Some(opt) = &t.option {
return Ok(match opt {
TableLoadOption::json { .. } => json::to_mem_table(t).await?,
TableLoadOption::ndjson { .. } => ndjson::to_mem_table(t).await?,
TableLoadOption::csv { .. } => csv::to_mem_table(t).await?,
TableLoadOption::parquet { .. } => parquet::to_mem_table(t).await?,
TableLoadOption::google_spreadsheet(_) => google_spreadsheets::to_mem_table(t).await?,
TableLoadOption::delta { .. } => delta::to_mem_table(t).await?,
});
Ok(match opt {
TableLoadOption::json { .. } => Arc::new(json::to_mem_table(t).await?),
TableLoadOption::ndjson { .. } => Arc::new(ndjson::to_mem_table(t).await?),
TableLoadOption::csv { .. } => Arc::new(csv::to_mem_table(t).await?),
TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t).await?,
TableLoadOption::google_spreadsheet(_) => {
Arc::new(google_spreadsheets::to_mem_table(t).await?)
}
TableLoadOption::delta { .. } => Arc::new(delta::to_mem_table(t).await?),
})
} else {
let t: Arc<dyn TableProvider> = match t.extension()? {
"csv" => Arc::new(csv::to_mem_table(t).await?),
"json" => Arc::new(json::to_mem_table(t).await?),
"ndjson" => Arc::new(ndjson::to_mem_table(t).await?),
"parquet" => parquet::to_datafusion_table(t).await?,
ext => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
t.io_source, t.name, ext,
)));
}
};
Ok(t)
}
// no format specified explictly, try to guess from file path
let t = match t.extension()? {
"csv" => csv::to_mem_table(t).await?,
"json" => json::to_mem_table(t).await?,
"ndjson" => ndjson::to_mem_table(t).await?,
"parquet" => parquet::to_mem_table(t).await?,
ext => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
t.io_source, t.name, ext,
)));
}
};
Ok(t)
}
/// For parsing table URI arg in CLI

View File

@ -1,19 +1,37 @@
use std::io::Read;
use std::sync::Arc;
use crate::error::ColumnQError;
use crate::table::{TableLoadOption, TableOptionParquet, TableSource};
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::TableProvider;
use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use datafusion::parquet::file::reader::SerializedFileReader;
use datafusion::parquet::file::serialized_reader::SliceableCursor;
use crate::error::ColumnQError;
use crate::table::TableSource;
pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
.unwrap_or_else(|| TableLoadOption::parquet(TableOptionParquet::default()));
let TableOptionParquet { use_memory_table } = opt.as_parquet()?;
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
if *use_memory_table {
to_mem_table(t).await
} else {
Ok(Arc::new(
ParquetTable::try_new(t.parsed_uri()?, 4).map_err(|err| {
ColumnQError::LoadParquet(format!("failed to load parquet: '{}'", err.to_string()))
})?,
))
}
}
pub async fn to_mem_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
// TODO: make batch size configurable
let batch_size = 1024;
@ -56,14 +74,15 @@ pub async fn to_mem_table(
));
}
Ok(datafusion::datasource::MemTable::try_new(
Arc::new(
schema.ok_or_else(|| {
let table =
Arc::new(datafusion::datasource::MemTable::try_new(
Arc::new(schema.ok_or_else(|| {
ColumnQError::LoadParquet("schema not found for table".to_string())
})?,
),
partitions,
)?)
})?),
partitions,
)?);
Ok(table)
}
#[cfg(test)]
@ -71,11 +90,34 @@ mod tests {
use super::*;
use std::fs;
use datafusion::datasource::TableProvider;
use crate::table::TableLoadOption;
use crate::test_util::*;
#[tokio::test]
async fn load_flattened_parquet() -> Result<(), ColumnQError> {
let t = to_datafusion_table(
&TableSource::new(
"blogs".to_string(),
test_data_path("blogs_flattened.parquet"),
)
.with_option(TableLoadOption::parquet(TableOptionParquet {
use_memory_table: false,
})),
)
.await?;
assert_eq!(t.statistics().num_rows, Some(500));
let stats = t.statistics().column_statistics.unwrap();
assert_eq!(stats[0].null_count, Some(245));
assert_eq!(stats[1].null_count, Some(373));
assert_eq!(stats[2].null_count, Some(237));
match t.as_any().downcast_ref::<ParquetTable>() {
Some(_) => Ok(()),
None => panic!("not read a datafusion::ParquetTable"),
}
}
#[tokio::test]
async fn load_simple_parquet() -> Result<(), ColumnQError> {
let t = to_mem_table(&TableSource::new(
@ -110,7 +152,7 @@ mod tests {
let t = to_mem_table(
&TableSource::new_with_uri("blogs", tmp_dir_path.to_string_lossy())
.with_option(TableLoadOption::parquet {}),
.with_option(TableLoadOption::parquet(TableOptionParquet::default())),
)
.await?;

View File

@ -89,7 +89,7 @@ fn properties_table() -> anyhow::Result<MemTable> {
Ok(MemTable::try_new(schema, vec![vec![record_batch]])?)
}
async fn ubuntu_ami_table() -> anyhow::Result<MemTable> {
async fn ubuntu_ami_table() -> anyhow::Result<Arc<dyn datafusion::datasource::TableProvider>> {
let mut table_source: table::TableSource = serde_yaml::from_str(
r#"
name: "ubuntu_ami"
@ -131,7 +131,7 @@ pub fn register_table_properties(dfctx: &mut ExecutionContext) -> anyhow::Result
}
pub async fn register_table_ubuntu_ami(dfctx: &mut ExecutionContext) -> anyhow::Result<()> {
dfctx.register_table("ubuntu_ami", Arc::new(ubuntu_ami_table().await?))?;
dfctx.register_table("ubuntu_ami", ubuntu_ami_table().await?)?;
Ok(())
}

Binary file not shown.