columnq: add apply_query() and exec_query_with_df() to query::rest module (#349)

Following this PR #348 

This PR exposes two functions from the columnq::query::rest module:

    apply_query()
    exec_query_with_df()

This PR also includes breaking changes to ensure consistency with the
function names in the `columnq::query::graphql` module. I renamed the
function `query_table` to `exec_table_query`.
This commit is contained in:
hozan23 2024-10-21 02:14:01 +02:00 committed by GitHub
parent e282c6c1d1
commit daabd84418
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 65 additions and 11 deletions

View File

@ -254,7 +254,7 @@ impl ColumnQ {
table_name: &str,
params: &HashMap<String, String>,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
query::rest::query_table(&self.dfctx, table_name, params).await
query::rest::exec_table_query(&self.dfctx, table_name, params).await
}
pub fn kv_get(&self, kv_name: &str, key: &str) -> Result<Option<&String>, QueryError> {

View File

@ -53,9 +53,9 @@ fn num_parse_err(e: std::num::ParseIntError) -> QueryError {
}
}
pub async fn table_query_to_df(
dfctx: &datafusion::execution::context::SessionContext,
table_name: &str,
/// Applies a rest query to the provided DataFrame.
pub fn apply_query(
mut df: datafusion::dataframe::DataFrame,
params: &HashMap<String, String>,
) -> Result<datafusion::dataframe::DataFrame, QueryError> {
lazy_static! {
@ -63,11 +63,6 @@ pub async fn table_query_to_df(
Regex::new(r"filter\[(?P<column>.+)\](?P<op>.+)?").unwrap();
}
let mut df = dfctx
.table(table_name)
.await
.map_err(|e| QueryError::invalid_table(e, table_name))?;
// filter[col1]eq='foo'
// filter[col2]lt=2
for (key, val) in params.iter().filter(|(k, _)| k.starts_with("filter[")) {
@ -158,7 +153,22 @@ pub async fn table_query_to_df(
Ok(df)
}
pub async fn query_table(
/// Rest query to a DataFrame using the given table name and SessionContext.
pub async fn table_query_to_df(
dfctx: &datafusion::execution::context::SessionContext,
table_name: &str,
params: &HashMap<String, String>,
) -> Result<datafusion::dataframe::DataFrame, QueryError> {
let df = dfctx
.table(table_name)
.await
.map_err(|e| QueryError::invalid_table(e, table_name))?;
apply_query(df, params)
}
/// Executes a rest query using the provided table name and SessionContext.
pub async fn exec_table_query(
dfctx: &datafusion::execution::context::SessionContext,
table_name: &str,
params: &HashMap<String, String>,
@ -167,6 +177,17 @@ pub async fn query_table(
df.collect().await.map_err(QueryError::query_exec)
}
/// Executes a rest query using the provided DataFrame.
pub async fn exec_query_with_df(
df: datafusion::dataframe::DataFrame,
params: &HashMap<String, String>,
) -> Result<Vec<RecordBatch>, QueryError> {
apply_query(df, params)?
.collect()
.await
.map_err(QueryError::query_exec)
}
#[cfg(test)]
mod tests {
use super::*;
@ -176,6 +197,37 @@ mod tests {
use crate::test_util::*;
#[tokio::test]
async fn simple_query_with_column_aliases() {
let mut dfctx = SessionContext::new();
register_table_ubuntu_ami(&mut dfctx).await;
let modified_df = dfctx
.table("ubuntu_ami")
.await
.unwrap()
.select(vec![col("ami_id").alias("aid"), col("version").alias("v")])
.unwrap();
let mut params = HashMap::<String, String>::new();
params.insert("limit".to_string(), "10".to_string());
params.insert("sort".to_string(), "aid".to_string());
params.insert("columns".to_string(), "aid,v".to_string());
let df = apply_query(modified_df.clone(), &params).unwrap();
let expected_df = modified_df
.select(vec![col("aid"), col("v")])
.unwrap()
.sort(vec![column_sort_expr_asc("aid")])
.unwrap()
.limit(0, Some(10))
.unwrap()
.into();
assert_eq_df(df.into(), expected_df);
}
#[tokio::test]
async fn consistent_and_deterministics_logical_plan() {
let mut dfctx = SessionContext::new();
@ -220,7 +272,9 @@ mod tests {
params.insert("filter[arch]".to_string(), "'amd64'".to_string());
params.insert("filter[zone]".to_string(), "'us-east-2'".to_string());
let batches = query_table(&dfctx, "ubuntu_ami", &params).await.unwrap();
let batches = exec_table_query(&dfctx, "ubuntu_ami", &params)
.await
.unwrap();
let batch = &batches[0];
assert_eq!(