csv use_memory_table option added(resolves issue #224) (#225)

* csv use_memory_table option added

* fixes issue #224
This commit is contained in:
Akshith Madhur 2022-11-22 12:55:36 +05:30 committed by GitHub
parent 1467739e25
commit 026382ecf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 7 deletions

View File

@ -3,14 +3,43 @@ use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use log::debug;
use crate::error::ColumnQError;
use crate::table::{TableLoadOption, TableOptionCsv, TableSource};
pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
.unwrap_or_else(|| TableLoadOption::csv(TableOptionCsv::default()));
if opt.as_csv().unwrap().use_memory_table {
return to_mem_table(t).await;
}
let table_url = ListingTableUrl::parse(t.get_uri_str())?;
let options = ListingOptions::new(Arc::new(CsvFormat::default()));
let schemaref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
let ctx = SessionContext::new();
options.infer_schema(&ctx.state(), &table_url).await?
}
};
let table_config = ListingTableConfig::new(table_url)
.with_listing_options(options)
.with_schema(schemaref);
Ok(Arc::new(ListingTable::try_new(table_config)?))
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
@ -65,16 +94,16 @@ pub async fn to_mem_table(
.collect::<Result<Vec<RecordBatch>, ColumnQError>>()
})?;
Ok(datafusion::datasource::MemTable::try_new(
let table = Arc::new(datafusion::datasource::MemTable::try_new(
schema_ref, partitions,
)?)
)?);
Ok(table)
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use std::fs;
use tempfile::Builder;

View File

@ -91,6 +91,8 @@ pub struct TableOptionCsv {
delimiter: u8,
#[serde(default = "TableOptionCsv::default_projection")]
projection: Option<Vec<usize>>,
#[serde(default = "TableOptionCsv::default_use_memory_table")]
use_memory_table: bool
}
impl TableOptionCsv {
@ -135,6 +137,10 @@ impl TableOptionCsv {
)),
}
}
#[inline]
pub fn default_use_memory_table() -> bool {
true
}
}
impl Default for TableOptionCsv {
@ -143,6 +149,7 @@ impl Default for TableOptionCsv {
has_header: Self::default_has_header(),
delimiter: Self::default_delimiter(),
projection: Self::default_projection(),
use_memory_table: Self::default_use_memory_table()
}
}
}
@ -473,7 +480,7 @@ pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQErro
TableLoadOption::ndjson { .. } | TableLoadOption::jsonl { .. } => {
Arc::new(ndjson::to_mem_table(t).await?)
}
TableLoadOption::csv { .. } => Arc::new(csv::to_mem_table(t).await?),
TableLoadOption::csv { .. } => csv::to_datafusion_table(t).await?,
TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t).await?,
TableLoadOption::google_spreadsheet(_) => {
Arc::new(google_spreadsheets::to_mem_table(t).await?)
@ -494,7 +501,7 @@ pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQErro
})
} else {
let t: Arc<dyn TableProvider> = match t.extension()? {
"csv" => Arc::new(csv::to_mem_table(t).await?),
"csv" => csv::to_datafusion_table(t).await?,
"json" => Arc::new(json::to_mem_table(t).await?),
"ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t).await?),
"parquet" => parquet::to_datafusion_table(t).await?,