support partitioned tables (#307)

This commit is contained in:
QP Hou 2023-10-20 16:00:07 -07:00 committed by GitHub
parent ea01ffe0d3
commit d531f8ae46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 2 deletions

View File

@ -48,7 +48,11 @@ pub async fn to_datafusion_table(
ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu {
uri: t.get_uri_str().to_string(),
})?;
let options = ListingOptions::new(Arc::new(CsvFormat::default()));
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
if let Some(partition_cols) = t.datafusion_partition_cols() {
options = options.with_table_partition_cols(partition_cols)
}
let schemaref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => options
@ -64,6 +68,7 @@ pub async fn to_datafusion_table(
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
))
}
pub async fn to_mem_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,

View File

@ -188,6 +188,13 @@ impl TableOptionCsv {
self
}
#[inline]
#[must_use]
pub fn with_use_memory_table(mut self, use_memory: bool) -> Self {
self.use_memory_table = use_memory;
self
}
fn deserialize_delimiter<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: Deserializer<'de>,
@ -200,6 +207,7 @@ impl TableOptionCsv {
)),
}
}
#[inline]
pub fn default_use_memory_table() -> bool {
true
@ -393,6 +401,7 @@ pub struct TableSource {
pub option: Option<TableLoadOption>,
#[serde(default = "TableSource::default_batch_size")]
pub batch_size: usize,
pub partition_columns: Option<Vec<TableColumn>>,
}
impl From<KeyValueSource> for TableSource {
@ -403,6 +412,7 @@ impl From<KeyValueSource> for TableSource {
schema: kv.schema,
option: kv.option,
batch_size: Self::default_batch_size(),
partition_columns: None,
}
}
}
@ -417,9 +427,18 @@ impl TableSource {
schema: None,
option,
batch_size: Self::default_batch_size(),
partition_columns: None,
}
}
pub fn datafusion_partition_cols(&self) -> Option<Vec<(String, arrow::datatypes::DataType)>> {
self.partition_columns.as_ref().map(|cols| {
cols.iter()
.map(|col| (col.name.to_string(), col.data_type.clone()))
.collect::<Vec<_>>()
})
}
pub fn new_with_uri(name: impl Into<String>, uri: impl Into<String>) -> Self {
Self::new(name, uri.into())
}
@ -443,6 +462,13 @@ impl TableSource {
self
}
#[inline]
#[must_use]
pub fn with_partition_columns(mut self, partitions: Vec<TableColumn>) -> Self {
self.partition_columns = Some(partitions);
self
}
pub fn get_uri_str(&self) -> &str {
match &self.io_source {
TableIoSource::Uri(uri) => uri.as_str(),

View File

@ -62,7 +62,11 @@ pub async fn to_datafusion_table(
let table_url = ListingTableUrl::parse(t.get_uri_str())
.context(ParseUriSnafu)
.context(table::LoadParquetSnafu)?;
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let mut options = ListingOptions::new(Arc::new(ParquetFormat::default()));
if let Some(partition_cols) = t.datafusion_partition_cols() {
options = options.with_table_partition_cols(partition_cols)
}
let schemaref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => options

View File

@ -0,0 +1,49 @@
mod helpers;
use columnq::datafusion::arrow::datatypes::DataType;
use columnq::table::{TableColumn, TableLoadOption, TableOptionCsv, TableSource};
#[tokio::test]
async fn test_partitioned_csv_table() {
let table_path = helpers::test_data_path("partitioned_csv");
let table = TableSource::new("partitioned_csv".to_string(), table_path)
.with_option(TableLoadOption::csv(
TableOptionCsv::default().with_use_memory_table(false),
))
.with_partition_columns(vec![
TableColumn {
name: "year".to_string(),
data_type: DataType::UInt16,
nullable: false,
},
TableColumn {
name: "month".to_string(),
data_type: DataType::UInt16,
nullable: false,
},
]);
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(
&format!("{address}/api/sql"),
"SELECT * FROM partitioned_csv ORDER BY ts ASC",
)
.await;
let status = response.status();
let data = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(
data,
serde_json::json!([
{"year": 2022, "month": 12, "ts": 100, "value": 0.5},
{"year": 2022, "month": 12, "ts": 101, "value": 7.8},
{"year": 2022, "month": 12, "ts": 102, "value": 4.0},
{"year": 2023, "month": 1, "ts": 201, "value": -1.0},
{"year": 2023, "month": 1, "ts": 202, "value": 100.0},
{"year": 2023, "month": 1, "ts": 203, "value": 0.0},
])
);
assert_eq!(status, 200);
}

View File

@ -0,0 +1,4 @@
ts,value
100,0.5
101,7.8
102,4
1 ts value
2 100 0.5
3 101 7.8
4 102 4

View File

@ -0,0 +1,4 @@
ts,value
201,-1
202,100.0
203,0
1 ts value
2 201 -1
3 202 100.0
4 203 0