diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8e2f710..7556caa 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -49,7 +49,7 @@ jobs: with: profile: default # toolchain: nightly - toolchain: nightly-2021-03-24 + toolchain: nightly-2021-07-17 override: true - name: Run tests run: cargo test --features simd diff --git a/Cargo.lock b/Cargo.lock index 71e082f..5035651 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,6 +605,7 @@ dependencies = [ "env_logger", "log", "rustyline", + "serde_json", "tokio", ] @@ -740,7 +741,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow-datafusion.git?rev=e97b86a8bc410983a73b5802ae44eb7a55faecd3#e97b86a8bc410983a73b5802ae44eb7a55faecd3" +source = "git+https://github.com/houqp/arrow-datafusion.git?rev=d5427d8dd1fd8bd6fb2541c1c9386802da7783de#d5427d8dd1fd8bd6fb2541c1c9386802da7783de" dependencies = [ "ahash", "arrow", diff --git a/README.md b/README.md index ab9e26d..eba8f00 100644 --- a/README.md +++ b/README.md @@ -216,8 +216,8 @@ Query layer: - [x] REST API GET - [x] GraphQL - [x] SQL - - [ ] join between tables - - [ ] support filter on nested struct fields + - [x] join between tables + - [ ] support query on nested struct fields - [ ] index - protocol - [ ] gRPC diff --git a/columnq-cli/Cargo.toml b/columnq-cli/Cargo.toml index bcce05d..6e639b4 100644 --- a/columnq-cli/Cargo.toml +++ b/columnq-cli/Cargo.toml @@ -11,6 +11,7 @@ path = "src/main.rs" [dependencies] columnq = { path = "../columnq" } +serde_json = "*" log = "0" arrow = "4" tokio = "1" diff --git a/columnq-cli/README.md b/columnq-cli/README.md new file mode 100644 index 0000000..6668107 --- /dev/null +++ b/columnq-cli/README.md @@ -0,0 +1,22 @@ +Columnq +======= + +Easy to use library and CLI to help you query tabular data with support for a +rich set of growing formats data sources. + + +Usage +----- + +Show schemas for a specific table: + +``` +columnq(sql)> show columns from uk_cities; ++---------------+--------------+------------+-------------+-----------+-------------+ +| table_catalog | table_schema | table_name | column_name | data_type | is_nullable | ++---------------+--------------+------------+-------------+-----------+-------------+ +| datafusion | public | uk_cities | city | Utf8 | NO | +| datafusion | public | uk_cities | lat | Float64 | NO | +| datafusion | public | uk_cities | lng | Float64 | NO | ++---------------+--------------+------------+-------------+-----------+-------------+ +``` diff --git a/columnq-cli/src/main.rs b/columnq-cli/src/main.rs index 89d5455..fcc4c62 100644 --- a/columnq-cli/src/main.rs +++ b/columnq-cli/src/main.rs @@ -3,9 +3,11 @@ use arrow::util::pretty; use log::debug; use rustyline::error::ReadlineError; use rustyline::Editor; +use serde_json; +use std::io::Read; use std::path::PathBuf; -use columnq::table::TableSource; +use columnq::table::{TableIoSource, TableLoadOption, TableSource}; use columnq::{ColumnQ, ExecutionConfig}; fn config_path() -> anyhow::Result { @@ -18,6 +20,61 @@ fn config_path() -> anyhow::Result { Ok(home) } +fn table_arg() -> clap::Arg<'static> { + clap::Arg::new("table") + .about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: blogs:s3://bucket/key,format=delta. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe.") + .takes_value(true) + .required(false) + .number_of_values(1) + .multiple(true) + .value_name("table_name:uri[,option_key=option_value]") + .long("table") + .short('t') +} + +fn parse_table_uri_arg(uri_arg: &str) -> anyhow::Result { + let mut split = uri_arg.splitn(2, ':'); + let table_name = split + .next() + .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?; + let uri = split + .next() + .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?; + + // separate uri from table load options + let mut uri_parts = uri.split(','); + let uri = uri_parts + .next() + .ok_or_else(|| anyhow::anyhow!("invalid table URI: {}", uri))?; + + let t = if uri == "stdin" { + let mut buffer = String::new(); + std::io::stdin().read_to_string(&mut buffer)?; + TableSource::new(table_name, TableIoSource::Memory(buffer.into_bytes())) + } else { + TableSource::new(table_name, uri.to_string()) + }; + + // parse extra options from table uri + let mut option_json = serde_json::map::Map::new(); + for opt_str in uri_parts.into_iter() { + let mut parts = opt_str.splitn(2, '='); + let opt_key = parts + .next() + .ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?; + let opt_value = parts + .next() + .ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?; + option_json.insert( + opt_key.to_string(), + serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()), + ); + } + + let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json))?; + Ok(t.with_option(opt)) +} + async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> { let mut path = config_path()?; if !path.as_path().exists() { @@ -72,22 +129,41 @@ async fn cmd_console(args: &clap::ArgMatches) -> anyhow::Result<()> { if let Some(tables) = args.values_of("table") { for v in tables { - let mut split = v.splitn(2, ':'); - let table_name = split - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?; - let uri = split - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?; - - let t = TableSource::new(table_name.to_string(), uri.to_string()); - cq.load_table(&t).await?; + cq.load_table(&parse_table_uri_arg(v)?).await?; } } console_loop(&cq).await } +async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> { + let config = ExecutionConfig::default().with_information_schema(true); + let mut cq = ColumnQ::new_with_config(config); + + if let Some(tables) = args.values_of("table") { + for v in tables { + cq.load_table(&parse_table_uri_arg(v)?).await?; + } + } + + match args.value_of("SQL") { + Some(query) => match cq.query_sql(&query).await { + Ok(batches) => match args.value_of("output").unwrap_or("table") { + "table" => pretty::print_batches(&batches)?, + other => anyhow::bail!("unsupported output format: {}", other), + }, + Err(e) => { + println!("Error: {}", e); + } + }, + None => { + unreachable!(); + } + } + + Ok(()) +} + #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -98,23 +174,40 @@ async fn main() -> anyhow::Result<()> { .setting(clap::AppSettings::SubcommandRequiredElseHelp) .setting(clap::AppSettings::VersionlessSubcommands) .subcommand( - clap::App::new("console") - .about("dump table metadata info") + clap::App::new("sql") + .about("Query tables with SQL") .setting(clap::AppSettings::ArgRequiredElseHelp) - .args(&[clap::Arg::new("table") - .about("table sources to load") - .takes_value(true) - .required(false) - .number_of_values(1) - .multiple(true) - .value_name("table_name:uri") - .long("table") - .short('t')]), + .args(&[ + clap::Arg::new("SQL") + .about("SQL query to execute") + .index(1) + .required(true) + .takes_value(true) + .number_of_values(1), + clap::Arg::new("output") + .about("Query output format") + .long("output") + .short('o') + .required(false) + .takes_value(true) + .number_of_values(1) + .default_value("table") + // TODO: add yaml + .possible_values(&["table", "json", "csv"]), + table_arg(), + ]), + ) + .subcommand( + clap::App::new("console") + .about("Query tables through an interactive console") + .setting(clap::AppSettings::ArgRequiredElseHelp) + .args(&[table_arg()]), ); let matches = app.get_matches(); match matches.subcommand() { Some(("console", console_matches)) => cmd_console(console_matches).await?, + Some(("sql", console_matches)) => cmd_sql(console_matches).await?, _ => unreachable!(), } diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 4dc82bf..2c7b0c2 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -11,7 +11,7 @@ path = "src/lib.rs" [dependencies] arrow = "4" parquet = "4" -datafusion = { git = "https://github.com/houqp/arrow-datafusion.git", rev = "e97b86a8bc410983a73b5802ae44eb7a55faecd3" } +datafusion = { git = "https://github.com/houqp/arrow-datafusion.git", rev = "d5427d8dd1fd8bd6fb2541c1c9386802da7783de" } log = "0" regex = "1" diff --git a/columnq/src/error.rs b/columnq/src/error.rs index 458f1c7..e86d47f 100644 --- a/columnq/src/error.rs +++ b/columnq/src/error.rs @@ -55,6 +55,9 @@ pub enum ColumnQError { #[from] source: datafusion::error::DataFusionError, }, + + #[error("Generic error: {0}")] + Generic(String), } impl ColumnQError { diff --git a/columnq/src/io/http.rs b/columnq/src/io/http.rs index 761e2cd..1e7c54a 100644 --- a/columnq/src/io/http.rs +++ b/columnq/src/io/http.rs @@ -11,7 +11,7 @@ pub async fn partitions_from_uri<'a, F, T>( where F: FnMut(std::io::Cursor) -> Result, { - let resp = reqwest::get(&t.uri) + let resp = reqwest::get(t.get_uri_str()) .await .map_err(|e| ColumnQError::HttpStore(e.to_string()))?; if resp.status().as_u16() / 100 != 2 { diff --git a/columnq/src/io/memory.rs b/columnq/src/io/memory.rs new file mode 100644 index 0000000..42fc3c5 --- /dev/null +++ b/columnq/src/io/memory.rs @@ -0,0 +1,15 @@ +use crate::error::ColumnQError; +use crate::table::TableSource; + +pub async fn partitions_from_memory<'a, F, T>( + t: &'a TableSource, + mut partition_reader: F, +) -> Result, ColumnQError> +where + F: FnMut(std::io::Cursor<&'a [u8]>) -> Result, +{ + let reader = std::io::Cursor::new(t.io_source.as_memory()?); + // There is no concept of directory listing for in memory data, so we always only return a + // single partition + Ok(vec![partition_reader(reader)?]) +} diff --git a/columnq/src/io/mod.rs b/columnq/src/io/mod.rs index 4600936..7aad827 100644 --- a/columnq/src/io/mod.rs +++ b/columnq/src/io/mod.rs @@ -2,14 +2,17 @@ use std::convert::TryFrom; pub mod fs; pub mod http; +pub mod memory; pub mod s3; use crate::error::ColumnQError; +#[derive(Debug, Clone, PartialEq, Eq)] pub enum BlobStoreType { Http, S3, FileSystem, + Memory, } impl TryFrom>> for BlobStoreType { @@ -22,6 +25,7 @@ impl TryFrom>> for BlobStoreType { Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => Ok(BlobStoreType::Http), Some(uriparse::Scheme::Unregistered(s)) => match s.as_str() { "s3" => Ok(BlobStoreType::S3), + "memory" => Ok(BlobStoreType::Memory), _ => Err(ColumnQError::InvalidUri(format!( "Unsupported scheme: {:?}", scheme diff --git a/columnq/src/io/s3.rs b/columnq/src/io/s3.rs index bab89d4..7aa1e63 100644 --- a/columnq/src/io/s3.rs +++ b/columnq/src/io/s3.rs @@ -202,7 +202,8 @@ where F: FnMut(std::io::Cursor>) -> Result, { let client = rusoto_s3::S3Client::new(rusoto_core::Region::default()); - let (bucket, key) = parse_uri(&t.uri)?; + // TODO: use host and path from URIReference instead + let (bucket, key) = parse_uri(&t.get_uri_str())?; let mut partitions = vec![]; diff --git a/columnq/src/lib.rs b/columnq/src/lib.rs index 1d97586..78152ff 100644 --- a/columnq/src/lib.rs +++ b/columnq/src/lib.rs @@ -21,6 +21,9 @@ macro_rules! partitions_from_table_source { io::BlobStoreType::S3 => { io::s3::partitions_from_uri(&$table_source, uri, $call_with_r).await } + io::BlobStoreType::Memory => { + io::memory::partitions_from_memory(&$table_source, $call_with_r).await + } } }}; } diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index 1704b11..c425a04 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -5,16 +5,22 @@ use arrow::record_batch::RecordBatch; use log::debug; use crate::error::ColumnQError; -use crate::table::TableSource; +use crate::table::{TableLoadOption, TableOptionCsv, TableSource}; pub async fn to_mem_table( t: &TableSource, ) -> Result { - // TODO: read csv option from config - let has_header = true; - let delimiter = b','; + let opt = t + .option + .clone() + .unwrap_or_else(|| TableLoadOption::csv(TableOptionCsv::default())); + let opt = opt.as_csv()?; + + let has_header = opt.has_header; + let delimiter = opt.delimiter; + let projection = opt.projection.as_ref(); + let batch_size = 1024; - let projection = None; debug!("inferring csv table schema..."); let schema_ref: arrow::datatypes::SchemaRef = match &t.schema { @@ -48,7 +54,7 @@ pub async fn to_mem_table( Some(delimiter), batch_size, None, - projection.clone(), + projection.cloned(), ); csv_reader @@ -70,7 +76,7 @@ mod tests { use datafusion::datasource::TableProvider; - use crate::table::TableLoadOption; + use crate::table::{TableIoSource, TableLoadOption}; use crate::test_util::*; #[tokio::test] @@ -83,16 +89,40 @@ mod tests { assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.csv"))? > 0); assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.csv"))? > 0); - let t = to_mem_table(&TableSource { - name: "uk_cities".to_string(), - uri: tmp_dir_path.to_string_lossy().to_string(), - schema: None, - option: Some(TableLoadOption::csv {}), - }) + let t = to_mem_table( + &TableSource::new( + "uk_cities".to_string(), + tmp_dir_path.to_string_lossy().to_string(), + ) + .with_option(TableLoadOption::csv( + TableOptionCsv::default().with_has_header(true), + )), + ) .await?; assert_eq!(t.statistics().num_rows, Some(37 * 3)); Ok(()) } + + #[tokio::test] + async fn load_from_memory() -> anyhow::Result<()> { + let csv_content = r#" +c1,c2,c3 +1,"hello",true +2,"world",true +100,"!",false +"# + .to_string(); + + let source = TableSource::new("test", TableIoSource::Memory(csv_content.into_bytes())) + .with_option(TableLoadOption::csv( + TableOptionCsv::default().with_has_header(true), + )); + let t = to_mem_table(&source).await?; + + assert_eq!(t.statistics().num_rows, Some(3)); + + Ok(()) + } } diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index 0c3ee75..f24d086 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -36,7 +36,8 @@ pub async fn to_mem_table( // TODO: make batch size configurable let batch_size = 1024; - let delta_table = deltalake::open_table(&t.uri).await?; + let uri_str = t.get_uri_str(); + let delta_table = deltalake::open_table(uri_str).await?; if delta_table.get_files().is_empty() { return Err(ColumnQError::LoadDelta("empty delta table".to_string())); @@ -67,8 +68,8 @@ pub async fn to_mem_table( } _ => { return Err(ColumnQError::InvalidUri(format!( - "Scheme in table uri not supported for delta table: {:?}", - t.uri + "Scheme in table uri not supported for delta table: {}", + uri_str, ))); } }; diff --git a/columnq/src/table/google_spreadsheets.rs b/columnq/src/table/google_spreadsheets.rs index 031706d..760ae12 100644 --- a/columnq/src/table/google_spreadsheets.rs +++ b/columnq/src/table/google_spreadsheets.rs @@ -307,18 +307,19 @@ pub async fn to_mem_table( static ref RE_GOOGLE_SHEET: Regex = Regex::new(r"https://docs.google.com/spreadsheets/d/(.+)").unwrap(); } - if RE_GOOGLE_SHEET.captures(&t.uri).is_none() { - return Err(ColumnQError::InvalidUri(t.uri.to_string())); + let uri_str = t.get_uri_str(); + if RE_GOOGLE_SHEET.captures(uri_str).is_none() { + return Err(ColumnQError::InvalidUri(uri_str.to_string())); } - let uri = URIReference::try_from(t.uri.as_str())?; + let uri = URIReference::try_from(uri_str)?; let spreadsheet_id = uri.path().segments()[2].as_str(); let opt = t .option .as_ref() .ok_or(ColumnQError::MissingOption)? - .as_google_spreadsheet_opt()?; + .as_google_spreadsheet()?; let token = fetch_auth_token(&opt).await?; let token_str = token.as_str(); diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index 2c72877..62b17fb 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -175,12 +175,10 @@ mod tests { #[tokio::test] async fn nested_struct_and_lists() -> Result<(), ColumnQError> { - let t = to_mem_table(&TableSource { - name: "spacex_launches".to_string(), - uri: test_data_path("spacex-launches.json"), - schema: None, - option: None, - }) + let t = to_mem_table(&TableSource::new( + "spacex_launches".to_string(), + test_data_path("spacex-launches.json"), + )) .await?; let schema = t.schema(); diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 7d569ea..56d2fef 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -2,6 +2,7 @@ use std::convert::TryFrom; use std::ffi::OsStr; use std::path::Path; +use serde::de::{Deserialize, Deserializer}; use serde_derive::Deserialize; use uriparse::URIReference; @@ -52,6 +53,70 @@ pub struct TableOptionGoogleSpreasheet { sheet_title: Option, } +/// CSV table load option +#[derive(Deserialize, Debug, Clone)] +pub struct TableOptionCsv { + #[serde(default = "TableOptionCsv::default_has_header")] + has_header: bool, + #[serde(default = "TableOptionCsv::default_delimiter")] + #[serde(deserialize_with = "TableOptionCsv::deserialize_delimiter")] + delimiter: u8, + #[serde(default = "TableOptionCsv::default_projection")] + projection: Option>, +} + +impl TableOptionCsv { + #[inline] + pub fn default_has_header() -> bool { + true + } + + #[inline] + pub fn default_delimiter() -> u8 { + b',' + } + + #[inline] + pub fn default_projection() -> Option> { + None + } + + #[inline] + pub fn with_delimiter(mut self, d: u8) -> Self { + self.delimiter = d; + self + } + + #[inline] + pub fn with_has_header(mut self, has_header: bool) -> Self { + self.has_header = has_header; + self + } + + fn deserialize_delimiter<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let buf = String::deserialize(deserializer)?; + match buf.len() { + 1 => Ok(buf.into_bytes()[0]), + _ => Err(serde::de::Error::custom( + "CSV delimiter should be a single character", + )), + } + } +} + +impl Default for TableOptionCsv { + fn default() -> Self { + Self { + has_header: Self::default_has_header(), + delimiter: Self::default_delimiter(), + projection: Self::default_projection(), + } + } +} + // Adding new table format: // * update TableLoadOption enum to add the new variant // * update TableLoadOption.extension @@ -68,7 +133,7 @@ pub enum TableLoadOption { pointer: Option, array_encoded: Option, }, - csv {}, + csv(TableOptionCsv), ndjson {}, parquet {}, google_spreadsheet(TableOptionGoogleSpreasheet), @@ -76,22 +141,63 @@ pub enum TableLoadOption { } impl TableLoadOption { - fn as_google_spreadsheet_opt(&self) -> Result<&TableOptionGoogleSpreasheet, ColumnQError> { + fn as_google_spreadsheet(&self) -> Result<&TableOptionGoogleSpreasheet, ColumnQError> { match self { - TableLoadOption::google_spreadsheet(opt) => Ok(&opt), + Self::google_spreadsheet(opt) => Ok(&opt), _ => Err(ColumnQError::ExpectFormatOption( "google_spreadsheets".to_string(), )), } } + fn as_csv(&self) -> Result<&TableOptionCsv, ColumnQError> { + match self { + Self::csv(opt) => Ok(&opt), + _ => Err(ColumnQError::ExpectFormatOption("csv".to_string())), + } + } + pub fn extension<'a>(&'a self) -> &'static str { match self { - TableLoadOption::json { .. } => "json", - TableLoadOption::ndjson { .. } => "ndjson", - TableLoadOption::csv { .. } => "csv", - TableLoadOption::parquet { .. } => "parquet", - TableLoadOption::google_spreadsheet(_) | TableLoadOption::delta { .. } => "", + Self::json { .. } => "json", + Self::ndjson { .. } => "ndjson", + Self::csv { .. } => "csv", + Self::parquet { .. } => "parquet", + Self::google_spreadsheet(_) | Self::delta { .. } => "", + } + } +} + +#[derive(Deserialize, Clone, Debug, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum TableIoSource { + Uri(String), + Memory(Vec), +} + +impl> From for TableIoSource { + fn from(s: T) -> Self { + Self::Uri(s.into()) + } +} + +impl TableIoSource { + pub fn as_memory(&self) -> Result<&[u8], ColumnQError> { + match self { + Self::Memory(data) => Ok(&data), + other => Err(ColumnQError::Generic(format!( + "expect memory IO source, got: {:?}", + other + ))), + } + } +} + +impl std::fmt::Display for TableIoSource { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + TableIoSource::Uri(uri) => write!(f, "uri({})", uri), + TableIoSource::Memory(_) => write!(f, "memory"), } } } @@ -100,38 +206,81 @@ impl TableLoadOption { #[serde(deny_unknown_fields)] pub struct TableSource { pub name: String, - pub uri: String, + #[serde(flatten)] + pub io_source: TableIoSource, pub schema: Option, pub option: Option, } impl TableSource { - pub fn new(name: String, uri: String) -> Self { - // TODO: parse table format from uri during initializeion? + pub fn new(name: impl Into, source: impl Into) -> Self { Self { - name, - uri, + name: name.into(), + io_source: source.into(), schema: None, option: None, } } + pub fn new_with_uri(name: impl Into, uri: impl Into) -> Self { + Self::new(name, uri.into()) + } + + pub fn with_option(mut self, option: impl Into) -> Self { + self.option = Some(option.into()); + self + } + + pub fn with_schema(mut self, schema: impl Into) -> Self { + self.schema = Some(schema.into()); + self + } + + pub fn get_uri_str(&self) -> &str { + match &self.io_source { + TableIoSource::Uri(uri) => uri.as_str(), + TableIoSource::Memory(_) => "memory", + } + } + pub fn parsed_uri(&self) -> Result { - URIReference::try_from(self.uri.as_str()) - .map_err(|_| ColumnQError::InvalidUri(self.uri.clone())) + match &self.io_source { + TableIoSource::Uri(uri) => URIReference::try_from(uri.as_str()) + .map_err(|_| ColumnQError::InvalidUri(uri.to_string())), + TableIoSource::Memory(_) => URIReference::builder() + .with_scheme(Some(uriparse::Scheme::try_from("memory").map_err(|e| { + ColumnQError::Generic(format!( + "failed to create uri scheme for memory IO source: {:?}", + e + )) + })?)) + .with_path(uriparse::Path::try_from("data").map_err(|e| { + ColumnQError::Generic(format!( + "failed to create uri path for memory IO source: {:?}", + e + )) + })?) + .build() + .map_err(|e| { + ColumnQError::Generic(format!( + "failed to create uri for memory IO source: {:?}", + e + )) + }), + } } pub fn extension(&self) -> Result<&str, ColumnQError> { - Ok(match &self.option { - Some(opt) => opt.extension(), - None => { - let ext = Path::new(&self.uri) + Ok(match (&self.option, &self.io_source) { + (Some(opt), _) => opt.extension(), + (None, TableIoSource::Uri(uri)) => { + let ext = Path::new(uri) .extension() .and_then(OsStr::to_str) .ok_or_else(|| { ColumnQError::InvalidUri(format!( "cannot detect table extension from uri: {}", - self.uri + uri )) })?; @@ -140,11 +289,16 @@ impl TableSource { _ => { return Err(ColumnQError::InvalidUri(format!( "unsupported extension in uri: {}", - self.uri + uri ))); } } } + (None, TableIoSource::Memory(_)) => { + return Err(ColumnQError::Generic( + "cannot detect table extension from memory IO source, please specify a format option".to_string() + )); + } }) } } @@ -162,24 +316,50 @@ pub async fn load(t: &TableSource) -> Result csv::to_mem_table(t).await?, - Some("json") => json::to_mem_table(t).await?, - Some("ndjson") => ndjson::to_mem_table(t).await?, - Some("parquet") => parquet::to_mem_table(t).await?, - Some(ext) => { - return Err(ColumnQError::InvalidUri(format!( - "failed to register `{}` as table `{}`, unsupported table format `{}`", - t.uri, t.name, ext, - ))); - } - None => { - return Err(ColumnQError::InvalidUri(format!( - "failed to register `{}` as table `{}`, cannot detect table format", - t.uri, t.name - ))); - } - }, - ) + let t = match t.extension()? { + "csv" => csv::to_mem_table(t).await?, + "json" => json::to_mem_table(t).await?, + "ndjson" => ndjson::to_mem_table(t).await?, + "parquet" => parquet::to_mem_table(t).await?, + ext => { + return Err(ColumnQError::InvalidUri(format!( + "failed to register `{}` as table `{}`, unsupported table format `{}`", + t.io_source, t.name, ext, + ))); + } + }; + + Ok(t) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn uri_deserialization() -> anyhow::Result<()> { + let table_source: TableSource = serde_yaml::from_str( + r#" +name: "ubuntu_ami" +uri: "test_data/ubuntu-ami.json" +option: + format: "json" + pointer: "/aaData" + array_encoded: true +schema: + columns: + - name: "zone" + data_type: "Utf8" + - name: "name" + data_type: "Utf8" +"#, + )?; + + assert_eq!( + table_source.io_source, + TableIoSource::Uri("test_data/ubuntu-ami.json".to_string()) + ); + + Ok(()) + } } diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index 3d975ee..7997cba 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -77,12 +77,10 @@ mod tests { #[tokio::test] async fn load_simple_parquet() -> Result<(), ColumnQError> { - let t = to_mem_table(&TableSource { - name: "blogs".to_string(), - uri: test_data_path("blogs.parquet"), - schema: None, - option: None, - }) + let t = to_mem_table(&TableSource::new( + "blogs".to_string(), + test_data_path("blogs.parquet"), + )) .await?; let schema = t.schema(); @@ -109,12 +107,10 @@ mod tests { assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.parquet"))? > 0); assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.parquet"))? > 0); - let t = to_mem_table(&TableSource { - name: "blogs".to_string(), - uri: tmp_dir_path.to_string_lossy().to_string(), - schema: None, - option: Some(TableLoadOption::parquet {}), - }) + let t = to_mem_table( + &TableSource::new_with_uri("blogs", tmp_dir_path.to_string_lossy()) + .with_option(TableLoadOption::parquet {}), + ) .await?; assert_eq!( diff --git a/columnq/src/test_util.rs b/columnq/src/test_util.rs index 7cd16e1..8fd0059 100644 --- a/columnq/src/test_util.rs +++ b/columnq/src/test_util.rs @@ -119,7 +119,8 @@ schema: "#, )?; - table_source.uri = test_data_path("ubuntu-ami.json"); + // patch uri path with the correct test data path + table_source.io_source = table::TableIoSource::Uri(test_data_path("ubuntu-ami.json")); Ok(table::load(&table_source).await?) } diff --git a/roapi-http/src/api/mod.rs b/roapi-http/src/api/mod.rs index 28a3c4a..000e329 100644 --- a/roapi-http/src/api/mod.rs +++ b/roapi-http/src/api/mod.rs @@ -22,9 +22,9 @@ impl HandlerContext { } for t in config.tables.iter() { - info!("loading `{}` as table `{}`", t.uri, t.name); + info!("loading `{}` as table `{}`", t.io_source, t.name); cq.load_table(t).await?; - info!("registered `{}` as table `{}`", t.uri, t.name); + info!("registered `{}` as table `{}`", t.io_source, t.name); } Ok(Self { cq }) diff --git a/roapi-http/src/api/routes.rs b/roapi-http/src/api/routes.rs index 58bfc38..c6ab9c8 100644 --- a/roapi-http/src/api/routes.rs +++ b/roapi-http/src/api/routes.rs @@ -44,12 +44,10 @@ mod tests { web::Data::new( HandlerContext::new(&Config { addr: None, - tables: vec![TableSource { - name: "uk_cities".to_string(), - uri: test_data_path("uk_cities_with_headers.csv"), - schema: None, - option: None, - }], + tables: vec![TableSource::new( + "uk_cities", + test_data_path("uk_cities_with_headers.csv"), + )], }) .await .unwrap(), @@ -60,58 +58,57 @@ mod tests { web::Data::new( HandlerContext::new(&Config { addr: None, - tables: vec![TableSource { - name: "ubuntu_ami".to_string(), - uri: test_data_path("ubuntu-ami.json"), - option: Some(TableLoadOption::json { - pointer: Some("/aaData".to_string()), - array_encoded: Some(true), - }), - schema: Some(TableSchema { - columns: vec![ - TableColumn { - name: "zone".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "name".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "version".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "arch".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "instance_type".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "release".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "ami_id".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - TableColumn { - name: "aki_id".to_string(), - data_type: arrow::datatypes::DataType::Utf8, - nullable: true, - }, - ], - }), - }], + tables: vec![ + TableSource::new("ubuntu_ami", test_data_path("ubuntu-ami.json")) + .with_option(TableLoadOption::json { + pointer: Some("/aaData".to_string()), + array_encoded: Some(true), + }) + .with_schema(TableSchema { + columns: vec![ + TableColumn { + name: "zone".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "name".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "version".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "arch".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "instance_type".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "release".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "ami_id".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + TableColumn { + name: "aki_id".to_string(), + data_type: arrow::datatypes::DataType::Utf8, + nullable: true, + }, + ], + }), + ], }) .await .unwrap(),