From 4cd9360fb4220a2e1741b24752fb0c3f432d26e0 Mon Sep 17 00:00:00 2001 From: Akshith Madhur <84667163+elliot14A@users.noreply.github.com> Date: Wed, 15 Mar 2023 12:05:02 +0530 Subject: [PATCH] fixed csv-s3 issue (#268) Co-authored-by: Elliot Alderson --- columnq/src/table/csv.rs | 11 +++++------ columnq/src/table/mod.rs | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index 83b10d7..aff0307 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -8,13 +8,15 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::datasource::TableProvider; -use datafusion::prelude::SessionContext; use log::debug; use crate::error::ColumnQError; use crate::table::{TableLoadOption, TableOptionCsv, TableSource}; -pub async fn to_datafusion_table(t: &TableSource) -> Result, ColumnQError> { +pub async fn to_datafusion_table( + t: &TableSource, + dfctx: &datafusion::execution::context::SessionContext, +) -> Result, ColumnQError> { let opt = t .option .clone() @@ -26,10 +28,7 @@ pub async fn to_datafusion_table(t: &TableSource) -> Result Arc::new(s.into()), - None => { - let ctx = SessionContext::new(); - options.infer_schema(&ctx.state(), &table_url).await? - } + None => options.infer_schema(&dfctx.state(), &table_url).await?, }; let table_config = ListingTableConfig::new(table_url) diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 404faf0..6d8ffcb 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -477,7 +477,7 @@ pub async fn load( TableLoadOption::ndjson { .. } | TableLoadOption::jsonl { .. } => { Arc::new(ndjson::to_mem_table(t).await?) } - TableLoadOption::csv { .. } => csv::to_datafusion_table(t).await?, + TableLoadOption::csv { .. } => csv::to_datafusion_table(t, dfctx).await?, TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t, dfctx).await?, TableLoadOption::google_spreadsheet(_) => { Arc::new(google_spreadsheets::to_mem_table(t).await?) @@ -498,7 +498,7 @@ pub async fn load( }) } else { let t: Arc = match t.extension()? { - "csv" => csv::to_datafusion_table(t).await?, + "csv" => csv::to_datafusion_table(t, dfctx).await?, "json" => Arc::new(json::to_mem_table(t).await?), "ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t).await?), "parquet" => parquet::to_datafusion_table(t, dfctx).await?,