diff --git a/columnq/src/columnq.rs b/columnq/src/columnq.rs index f6b859d..e698731 100644 --- a/columnq/src/columnq.rs +++ b/columnq/src/columnq.rs @@ -254,7 +254,7 @@ impl ColumnQ { table_name: &str, params: &HashMap, ) -> Result, QueryError> { - query::rest::query_table(&self.dfctx, table_name, params).await + query::rest::exec_table_query(&self.dfctx, table_name, params).await } pub fn kv_get(&self, kv_name: &str, key: &str) -> Result, QueryError> { diff --git a/columnq/src/query/rest.rs b/columnq/src/query/rest.rs index 63adf45..b8d9481 100644 --- a/columnq/src/query/rest.rs +++ b/columnq/src/query/rest.rs @@ -53,9 +53,9 @@ fn num_parse_err(e: std::num::ParseIntError) -> QueryError { } } -pub async fn table_query_to_df( - dfctx: &datafusion::execution::context::SessionContext, - table_name: &str, +/// Applies a rest query to the provided DataFrame. +pub fn apply_query( + mut df: datafusion::dataframe::DataFrame, params: &HashMap, ) -> Result { lazy_static! { @@ -63,11 +63,6 @@ pub async fn table_query_to_df( Regex::new(r"filter\[(?P.+)\](?P.+)?").unwrap(); } - let mut df = dfctx - .table(table_name) - .await - .map_err(|e| QueryError::invalid_table(e, table_name))?; - // filter[col1]eq='foo' // filter[col2]lt=2 for (key, val) in params.iter().filter(|(k, _)| k.starts_with("filter[")) { @@ -158,7 +153,22 @@ pub async fn table_query_to_df( Ok(df) } -pub async fn query_table( +/// Rest query to a DataFrame using the given table name and SessionContext. +pub async fn table_query_to_df( + dfctx: &datafusion::execution::context::SessionContext, + table_name: &str, + params: &HashMap, +) -> Result { + let df = dfctx + .table(table_name) + .await + .map_err(|e| QueryError::invalid_table(e, table_name))?; + + apply_query(df, params) +} + +/// Executes a rest query using the provided table name and SessionContext. +pub async fn exec_table_query( dfctx: &datafusion::execution::context::SessionContext, table_name: &str, params: &HashMap, @@ -167,6 +177,17 @@ pub async fn query_table( df.collect().await.map_err(QueryError::query_exec) } +/// Executes a rest query using the provided DataFrame. +pub async fn exec_query_with_df( + df: datafusion::dataframe::DataFrame, + params: &HashMap, +) -> Result, QueryError> { + apply_query(df, params)? + .collect() + .await + .map_err(QueryError::query_exec) +} + #[cfg(test)] mod tests { use super::*; @@ -176,6 +197,37 @@ mod tests { use crate::test_util::*; + #[tokio::test] + async fn simple_query_with_column_aliases() { + let mut dfctx = SessionContext::new(); + register_table_ubuntu_ami(&mut dfctx).await; + + let modified_df = dfctx + .table("ubuntu_ami") + .await + .unwrap() + .select(vec![col("ami_id").alias("aid"), col("version").alias("v")]) + .unwrap(); + + let mut params = HashMap::::new(); + params.insert("limit".to_string(), "10".to_string()); + params.insert("sort".to_string(), "aid".to_string()); + params.insert("columns".to_string(), "aid,v".to_string()); + + let df = apply_query(modified_df.clone(), ¶ms).unwrap(); + + let expected_df = modified_df + .select(vec![col("aid"), col("v")]) + .unwrap() + .sort(vec![column_sort_expr_asc("aid")]) + .unwrap() + .limit(0, Some(10)) + .unwrap() + .into(); + + assert_eq_df(df.into(), expected_df); + } + #[tokio::test] async fn consistent_and_deterministics_logical_plan() { let mut dfctx = SessionContext::new(); @@ -220,7 +272,9 @@ mod tests { params.insert("filter[arch]".to_string(), "'amd64'".to_string()); params.insert("filter[zone]".to_string(), "'us-east-2'".to_string()); - let batches = query_table(&dfctx, "ubuntu_ami", ¶ms).await.unwrap(); + let batches = exec_table_query(&dfctx, "ubuntu_ami", ¶ms) + .await + .unwrap(); let batch = &batches[0]; assert_eq!(