diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index 3d86327..e48cd8d 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -48,7 +48,11 @@ pub async fn to_datafusion_table( ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu { uri: t.get_uri_str().to_string(), })?; - let options = ListingOptions::new(Arc::new(CsvFormat::default())); + let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); + if let Some(partition_cols) = t.datafusion_partition_cols() { + options = options.with_table_partition_cols(partition_cols) + } + let schemaref = match &t.schema { Some(s) => Arc::new(s.into()), None => options @@ -64,6 +68,7 @@ pub async fn to_datafusion_table( ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?, )) } + pub async fn to_mem_table( t: &TableSource, dfctx: &datafusion::execution::context::SessionContext, diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 117bd5d..9471d7e 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -188,6 +188,13 @@ impl TableOptionCsv { self } + #[inline] + #[must_use] + pub fn with_use_memory_table(mut self, use_memory: bool) -> Self { + self.use_memory_table = use_memory; + self + } + fn deserialize_delimiter<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, @@ -200,6 +207,7 @@ impl TableOptionCsv { )), } } + #[inline] pub fn default_use_memory_table() -> bool { true @@ -393,6 +401,7 @@ pub struct TableSource { pub option: Option, #[serde(default = "TableSource::default_batch_size")] pub batch_size: usize, + pub partition_columns: Option>, } impl From for TableSource { @@ -403,6 +412,7 @@ impl From for TableSource { schema: kv.schema, option: kv.option, batch_size: Self::default_batch_size(), + partition_columns: None, } } } @@ -417,9 +427,18 @@ impl TableSource { schema: None, option, batch_size: Self::default_batch_size(), + partition_columns: None, } } + pub fn datafusion_partition_cols(&self) -> Option> { + self.partition_columns.as_ref().map(|cols| { + cols.iter() + .map(|col| (col.name.to_string(), col.data_type.clone())) + .collect::>() + }) + } + pub fn new_with_uri(name: impl Into, uri: impl Into) -> Self { Self::new(name, uri.into()) } @@ -443,6 +462,13 @@ impl TableSource { self } + #[inline] + #[must_use] + pub fn with_partition_columns(mut self, partitions: Vec) -> Self { + self.partition_columns = Some(partitions); + self + } + pub fn get_uri_str(&self) -> &str { match &self.io_source { TableIoSource::Uri(uri) => uri.as_str(), diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index ab1f4b4..ebbafbb 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -62,7 +62,11 @@ pub async fn to_datafusion_table( let table_url = ListingTableUrl::parse(t.get_uri_str()) .context(ParseUriSnafu) .context(table::LoadParquetSnafu)?; - let options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); + if let Some(partition_cols) = t.datafusion_partition_cols() { + options = options.with_table_partition_cols(partition_cols) + } + let schemaref = match &t.schema { Some(s) => Arc::new(s.into()), None => options diff --git a/roapi/tests/partitioned_table_test.rs b/roapi/tests/partitioned_table_test.rs new file mode 100644 index 0000000..42c3347 --- /dev/null +++ b/roapi/tests/partitioned_table_test.rs @@ -0,0 +1,49 @@ +mod helpers; + +use columnq::datafusion::arrow::datatypes::DataType; +use columnq::table::{TableColumn, TableLoadOption, TableOptionCsv, TableSource}; + +#[tokio::test] +async fn test_partitioned_csv_table() { + let table_path = helpers::test_data_path("partitioned_csv"); + let table = TableSource::new("partitioned_csv".to_string(), table_path) + .with_option(TableLoadOption::csv( + TableOptionCsv::default().with_use_memory_table(false), + )) + .with_partition_columns(vec![ + TableColumn { + name: "year".to_string(), + data_type: DataType::UInt16, + nullable: false, + }, + TableColumn { + name: "month".to_string(), + data_type: DataType::UInt16, + nullable: false, + }, + ]); + + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; + tokio::spawn(app.run_until_stopped()); + + let response = helpers::http_post( + &format!("{address}/api/sql"), + "SELECT * FROM partitioned_csv ORDER BY ts ASC", + ) + .await; + + let status = response.status(); + let data = response.json::().await.unwrap(); + assert_eq!( + data, + serde_json::json!([ + {"year": 2022, "month": 12, "ts": 100, "value": 0.5}, + {"year": 2022, "month": 12, "ts": 101, "value": 7.8}, + {"year": 2022, "month": 12, "ts": 102, "value": 4.0}, + {"year": 2023, "month": 1, "ts": 201, "value": -1.0}, + {"year": 2023, "month": 1, "ts": 202, "value": 100.0}, + {"year": 2023, "month": 1, "ts": 203, "value": 0.0}, + ]) + ); + assert_eq!(status, 200); +} diff --git a/test_data/partitioned_csv/year=2022/month=12/p001.csv b/test_data/partitioned_csv/year=2022/month=12/p001.csv new file mode 100644 index 0000000..3c9554c --- /dev/null +++ b/test_data/partitioned_csv/year=2022/month=12/p001.csv @@ -0,0 +1,4 @@ +ts,value +100,0.5 +101,7.8 +102,4 diff --git a/test_data/partitioned_csv/year=2023/month=1/p001.csv b/test_data/partitioned_csv/year=2023/month=1/p001.csv new file mode 100644 index 0000000..51cdd39 --- /dev/null +++ b/test_data/partitioned_csv/year=2023/month=1/p001.csv @@ -0,0 +1,4 @@ +ts,value +201,-1 +202,100.0 +203,0