feat: support datafusion config (#288)

This commit is contained in:
Joe 2023-08-13 09:09:12 +08:00 committed by GitHub
parent 921aef9f01
commit 500e535caa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 1 deletions

View File

@ -1,3 +1,6 @@
use anyhow::Ok;
use columnq::datafusion::config::ConfigOptions;
use columnq::SessionConfig;
use serde_derive::Deserialize;
use anyhow::{bail, Context, Result};
@ -6,6 +9,7 @@ use columnq::encoding;
use columnq::table::parse_table_uri_arg;
use columnq::table::KeyValueSource;
use columnq::table::TableSource;
use std::collections::HashMap;
use std::fs;
use std::time::Duration;
@ -26,6 +30,8 @@ pub struct Config {
pub kvstores: Vec<KeyValueSource>,
#[serde(default)]
pub response_format: encoding::ContentType,
#[serde(default)]
pub datafusion: Option<HashMap<String, String>>,
}
fn table_arg() -> clap::Arg<'static> {
@ -164,3 +170,18 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
Ok(config)
}
impl Config {
pub fn get_datafusion_config(&self) -> Result<SessionConfig> {
match &self.datafusion {
Some(df_cfg) => {
let mut opt = ConfigOptions::default();
for (k, v) in df_cfg {
opt.set(format!("datafusion.{}", k).as_str(), v)?;
}
Ok(opt.into())
}
None => Ok(SessionConfig::default()),
}
}
}

View File

@ -23,7 +23,10 @@ pub struct RawRoapiContext {
impl RawRoapiContext {
pub async fn new(config: &Config) -> anyhow::Result<Self> {
let mut cq = ColumnQ::new();
let mut cq = match config.get_datafusion_config() {
Ok(df_cfg) => ColumnQ::new_with_config(df_cfg),
_ => ColumnQ::new(),
};
if config.tables.is_empty() && config.kvstores.is_empty() {
anyhow::bail!("No table nor kvstore found in config");

View File

@ -0,0 +1,22 @@
use anyhow::Result;
use roapi::config::Config;
use std::fs;
mod helpers;
#[test]
fn test_load_yaml_datafusion_config() -> Result<()> {
let config_path = helpers::test_data_path("./test_datafusion_config.yml");
let config_content = fs::read_to_string(config_path)?;
let cfg: Config = serde_yaml::from_str(&config_content)?;
let df_cfg = cfg.get_datafusion_config()?;
assert_eq!(df_cfg.options().sql_parser.dialect, "Hive");
assert_eq!(df_cfg.options().explain.physical_plan_only, true);
assert_eq!(df_cfg.options().optimizer.max_passes, 10);
assert_eq!(df_cfg.options().execution.batch_size, 100);
assert_eq!(df_cfg.options().catalog.format, Some("parquet".to_string()));
Ok(())
}

View File

@ -13,6 +13,7 @@ pub fn test_data_path(relative_path: &str) -> String {
d.to_string_lossy().to_string()
}
#[allow(dead_code)]
pub async fn test_api_app_with_tables(tables: Vec<TableSource>) -> (Application, String) {
test_api_app(tables, vec![]).await
}
@ -66,6 +67,7 @@ pub async fn http_post(url: &str, payload: impl Into<reqwest::Body>) -> reqwest:
.expect("Unable to execute POST request")
}
#[allow(dead_code)]
pub fn get_spacex_table() -> TableSource {
let json_source_path = test_data_path("spacex_launches.json");
TableSource::new("spacex_launches".to_string(), json_source_path)

View File

@ -0,0 +1,37 @@
addr:
# binding address for TCP port that speaks HTTP protocol
http: 0.0.0.0:8000
# binding address for TCP port that speaks Postgres wire protocol
postgres: 0.0.0.0:5432
tables:
- name: "s3_blogs"
uri: "s3://test-data/blogs.parquet"
option:
format: "parquet"
use_memory_table: true
- name: "s3_blogs_space_encode"
uri: "s3://test-data/blogs%20space.parquet"
option:
format: "parquet"
use_memory_table: true
- name: "s3_blogs_dir"
uri: "s3://test-data/blogs/"
option:
format: "parquet"
use_memory_table: true
- name: "gcs_blogs"
uri: "gs://test-data/blogs.parquet"
option:
format: "parquet"
use_memory_table: true
- name: "azure_blogs"
uri: "az://test-data/blogs.parquet"
option:
format: "parquet"
use_memory_table: true
datafusion:
sql_parser.dialect: "Hive"
explain.physical_plan_only: true
optimizer.max_passes: 10
execution.batch_size: 100
catalog.format: "parquet"