fixed csv-s3 issue (#268)

Co-authored-by: Elliot Alderson <elliot@Elliots-MacBook-Pro.local>
This commit is contained in:
Akshith Madhur 2023-03-15 12:05:02 +05:30 committed by GitHub
parent 51e01ef968
commit 4cd9360fb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 8 deletions

View File

@ -8,13 +8,15 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use log::debug;
use crate::error::ColumnQError;
use crate::table::{TableLoadOption, TableOptionCsv, TableSource};
pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
@ -26,10 +28,7 @@ pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvide
let options = ListingOptions::new(Arc::new(CsvFormat::default()));
let schemaref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
let ctx = SessionContext::new();
options.infer_schema(&ctx.state(), &table_url).await?
}
None => options.infer_schema(&dfctx.state(), &table_url).await?,
};
let table_config = ListingTableConfig::new(table_url)

View File

@ -477,7 +477,7 @@ pub async fn load(
TableLoadOption::ndjson { .. } | TableLoadOption::jsonl { .. } => {
Arc::new(ndjson::to_mem_table(t).await?)
}
TableLoadOption::csv { .. } => csv::to_datafusion_table(t).await?,
TableLoadOption::csv { .. } => csv::to_datafusion_table(t, dfctx).await?,
TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t, dfctx).await?,
TableLoadOption::google_spreadsheet(_) => {
Arc::new(google_spreadsheets::to_mem_table(t).await?)
@ -498,7 +498,7 @@ pub async fn load(
})
} else {
let t: Arc<dyn TableProvider> = match t.extension()? {
"csv" => csv::to_datafusion_table(t).await?,
"csv" => csv::to_datafusion_table(t, dfctx).await?,
"json" => Arc::new(json::to_mem_table(t).await?),
"ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t).await?),
"parquet" => parquet::to_datafusion_table(t, dfctx).await?,