From 95f9ced23d25aa71871db8a899a6b0d56c1a07f1 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Thu, 22 Jul 2021 23:08:32 -0700 Subject: [PATCH] support deriving table name from table uri --- Cargo.lock | 5 +- README.md | 10 +- columnq-cli/Cargo.toml | 3 + columnq-cli/README.md | 4 +- columnq-cli/src/main.rs | 54 +-------- columnq/Cargo.toml | 2 +- columnq/src/table/mod.rs | 103 +++++++++++++++++- roapi-http/Cargo.toml | 2 +- roapi-http/src/main.rs | 35 +++--- ...cex-launches.json => spacex_launches.json} | 0 10 files changed, 130 insertions(+), 88 deletions(-) rename test_data/{spacex-launches.json => spacex_launches.json} (100%) diff --git a/Cargo.lock b/Cargo.lock index e9a4f26..edbf740 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -562,7 +562,7 @@ dependencies = [ [[package]] name = "columnq" -version = "0.3.1" +version = "0.3.3" dependencies = [ "anyhow", "arrow", @@ -605,6 +605,7 @@ dependencies = [ "dirs 3.0.2", "env_logger", "log", + "pretty_assertions", "rustyline", "serde_json", "tokio", @@ -2239,7 +2240,7 @@ dependencies = [ [[package]] name = "roapi-http" -version = "0.3.1" +version = "0.3.3" dependencies = [ "actix-cors", "actix-http", diff --git a/README.md b/README.md index 574d165..dc5299e 100644 --- a/README.md +++ b/README.md @@ -50,20 +50,20 @@ cargo install --locked --git https://github.com/roapi/roapi --branch main --bin ### Quick start Spin up APIs for `test_data/uk_cities_with_headers.csv` and -`test_data/spacex-launches.json`: +`test_data/spacex_launches.json`: ```bash roapi-http \ - --table 'uk_cities:test_data/uk_cities_with_headers.csv' \ - --table 'spacex_launches:test_data/spacex-launches.json' + --table 'uk_cities=test_data/uk_cities_with_headers.csv' \ + --table 'test_data/spacex_launches.json' ``` Or using docker: ```bash docker run -t --rm -p 8080:8080 ghcr.io/roapi/roapi-http:latest --addr 0.0.0.0:8080 \ - --table 'uk_cities:test_data/uk_cities_with_headers.csv' \ - --table 'spacex_launches:test_data/spacex-launches.json' + --table 'uk_cities=test_data/uk_cities_with_headers.csv' \ + --table 'test_data/spacex_launches.json' ``` Query tables using SQL, GraphQL or REST: diff --git a/columnq-cli/Cargo.toml b/columnq-cli/Cargo.toml index 6e639b4..c7f987c 100644 --- a/columnq-cli/Cargo.toml +++ b/columnq-cli/Cargo.toml @@ -20,3 +20,6 @@ env_logger = { version = "0" } anyhow = { version = "1" } clap = { version = ">=3.0.0-beta.2,<4", features = ["color"] } dirs = { version = "3" } + +[dev-dependencies] +pretty_assertions = "*" diff --git a/columnq-cli/README.md b/columnq-cli/README.md index e5142de..be668b8 100644 --- a/columnq-cli/README.md +++ b/columnq-cli/README.md @@ -25,6 +25,6 @@ The Columnq CLI can also be used as a handy utility to Convert tabular data between various formats: `json`, `parquet`, `csv`, `yaml`, `arrow`, etc. ``` -columnq sql --table 't:test_data/uk_cities_with_headers.csv' 'SELECT * FROM t' --output json -cat test_data/blogs.parquet | columnq sql --table 't:stdin,format=parquet' 'SELECT * FROM t' --output json +$ columnq sql --table 't=test_data/uk_cities_with_headers.csv' 'SELECT * FROM t' --output json +$ cat test_data/blogs.parquet | columnq sql --table 't=stdin,format=parquet' 'SELECT * FROM t' --output json ``` diff --git a/columnq-cli/src/main.rs b/columnq-cli/src/main.rs index 9d71276..f364b38 100644 --- a/columnq-cli/src/main.rs +++ b/columnq-cli/src/main.rs @@ -3,10 +3,9 @@ use arrow::util::pretty; use log::debug; use rustyline::error::ReadlineError; use rustyline::Editor; -use std::io::Read; use std::path::PathBuf; -use columnq::table::{TableIoSource, TableLoadOption, TableSource}; +use columnq::table::parse_table_uri_arg; use columnq::{encoding, ColumnQ, ExecutionConfig}; fn config_path() -> anyhow::Result { @@ -21,63 +20,16 @@ fn config_path() -> anyhow::Result { fn table_arg() -> clap::Arg<'static> { clap::Arg::new("table") - .about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: blogs:s3://bucket/key,format=delta. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe.") + .about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: `blogs=s3://bucket/key,format=delta`. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe. If no table_name is provided, a table name will be derived from the filename in URI.") .takes_value(true) .required(false) .number_of_values(1) .multiple(true) - .value_name("table_name:uri[,option_key=option_value]") + .value_name("[table_name=]uri[,option_key=option_value]") .long("table") .short('t') } -fn parse_table_uri_arg(uri_arg: &str) -> anyhow::Result { - let mut split = uri_arg.splitn(2, ':'); - let table_name = split - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?; - let uri = split - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?; - - // separate uri from table load options - let mut uri_parts = uri.split(','); - let uri = uri_parts - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table URI: {}", uri))?; - - let t = if uri == "stdin" { - let mut buffer = Vec::new(); - std::io::stdin().read_to_end(&mut buffer)?; - TableSource::new(table_name, TableIoSource::Memory(buffer)) - } else { - TableSource::new(table_name, uri.to_string()) - }; - - // parse extra options from table uri - let mut option_json = serde_json::map::Map::new(); - for opt_str in uri_parts.into_iter() { - let mut parts = opt_str.splitn(2, '='); - let opt_key = parts - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?; - let opt_value = parts - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?; - option_json.insert( - opt_key.to_string(), - serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()), - ); - } - - if option_json.len() > 0 { - let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json))?; - Ok(t.with_option(opt)) - } else { - Ok(t) - } -} - async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> { let mut path = config_path()?; if !path.as_path().exists() { diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 208490b..afa150f 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "columnq" -version = "0.3.2" +version = "0.3.3" authors = ["Qingping Hou "] edition = "2018" diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 56d2fef..79cd1ca 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -1,5 +1,6 @@ use std::convert::TryFrom; use std::ffi::OsStr; +use std::io::Read; use std::path::Path; use serde::de::{Deserialize, Deserializer}; @@ -15,7 +16,7 @@ pub mod json; pub mod ndjson; pub mod parquet; -#[derive(Deserialize, Clone, Debug)] +#[derive(Deserialize, Clone, Debug, Eq, PartialEq)] #[serde(deny_unknown_fields)] pub struct TableColumn { pub name: String, @@ -30,7 +31,7 @@ impl From<&TableColumn> for arrow::datatypes::Field { } } -#[derive(Deserialize, Clone, Debug)] +#[derive(Deserialize, Clone, Debug, Eq, PartialEq)] #[serde(deny_unknown_fields)] pub struct TableSchema { pub columns: Vec, @@ -47,14 +48,14 @@ impl From<&TableSchema> for arrow::datatypes::Schema { } } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, Eq, PartialEq)] pub struct TableOptionGoogleSpreasheet { application_secret_path: String, sheet_title: Option, } /// CSV table load option -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, Eq, PartialEq)] pub struct TableOptionCsv { #[serde(default = "TableOptionCsv::default_has_header")] has_header: bool, @@ -124,7 +125,7 @@ impl Default for TableOptionCsv { // * update load #[allow(non_camel_case_types)] -#[derive(Deserialize, Clone, Debug)] +#[derive(Deserialize, Clone, Debug, Eq, PartialEq)] #[serde(tag = "format")] #[serde(deny_unknown_fields)] pub enum TableLoadOption { @@ -202,7 +203,7 @@ impl std::fmt::Display for TableIoSource { } } -#[derive(Deserialize, Clone, Debug)] +#[derive(Deserialize, Clone, Debug, Eq, PartialEq)] #[serde(deny_unknown_fields)] pub struct TableSource { pub name: String, @@ -332,9 +333,75 @@ pub async fn load(t: &TableSource) -> Result Result { + // separate uri from table load options + let mut uri_args = uri_arg.split(','); + + let uri = uri_args + .next() + .ok_or_else(|| ColumnQError::Generic(format!("invalid table URI argument: {}", uri_arg)))?; + let split = uri.splitn(2, '=').collect::>(); + + let (table_name, uri) = match split.len() { + 1 => { + let uri = split[0]; + let table_name = match Path::new(uri).file_stem() { + Some(s) => Ok(s), + None => Path::new(uri) + .file_name() + .ok_or_else(|| ColumnQError::Generic(format!("invalid table URI: {}", uri))), + }? + .to_str() + .ok_or_else(|| ColumnQError::Generic(format!("invalid table URI string: {}", uri)))?; + + (table_name, uri) + } + 2 => (split[0], split[1]), + _ => unreachable!(), + }; + + let t = if uri == "stdin" { + let mut buffer = Vec::new(); + std::io::stdin().read_to_end(&mut buffer).map_err(|e| { + ColumnQError::Generic(format!("Failed to read table data from stdin: {:?}", e)) + })?; + TableSource::new(table_name, TableIoSource::Memory(buffer)) + } else { + TableSource::new(table_name, uri.to_string()) + }; + + // parse extra options from table uri + let mut option_json = serde_json::map::Map::new(); + for opt_str in uri_args.into_iter() { + let mut parts = opt_str.splitn(2, '='); + let opt_key = parts + .next() + .ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {:?}", opt_str)))?; + let opt_value = parts + .next() + .ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {:?}", opt_str)))?; + option_json.insert( + opt_key.to_string(), + serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()), + ); + } + + if !option_json.is_empty() { + let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json)) + .map_err(|e| { + ColumnQError::Generic(format!("Failed to parse table option: {:?}", e)) + })?; + Ok(t.with_option(opt)) + } else { + Ok(t) + } +} + #[cfg(test)] mod tests { use super::*; + use pretty_assertions::assert_eq; #[test] fn uri_deserialization() -> anyhow::Result<()> { @@ -362,4 +429,28 @@ schema: Ok(()) } + + #[test] + fn test_parse_table_uri() { + let t = parse_table_uri_arg("t=a/b/c").unwrap(); + assert_eq!(TableSource::new("t", "a/b/c"), t); + + let t = parse_table_uri_arg("t=s3://a/b/c,format=csv,has_header=true").unwrap(); + assert_eq!( + TableSource::new("t", "s3://a/b/c").with_option(TableLoadOption::csv( + TableOptionCsv::default().with_has_header(true), + )), + t + ); + + let t = parse_table_uri_arg("s3://a/b/foo.csv").unwrap(); + assert_eq!(TableSource::new("foo", "s3://a/b/foo.csv"), t); + + let t = parse_table_uri_arg("s3://a/b/foo.csv,format=csv").unwrap(); + assert_eq!( + TableSource::new("foo", "s3://a/b/foo.csv") + .with_option(TableLoadOption::csv(TableOptionCsv::default())), + t + ); + } } diff --git a/roapi-http/Cargo.toml b/roapi-http/Cargo.toml index e65caed..7329114 100644 --- a/roapi-http/Cargo.toml +++ b/roapi-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "roapi-http" -version = "0.3.2" +version = "0.3.3" authors = ["Qingping Hou "] edition = "2018" diff --git a/roapi-http/src/main.rs b/roapi-http/src/main.rs index 6318103..15983bf 100644 --- a/roapi-http/src/main.rs +++ b/roapi-http/src/main.rs @@ -5,12 +5,24 @@ use std::fs; use actix_cors::Cors; use actix_web::{middleware, web, App, HttpServer}; use anyhow::Context; -use columnq::table::TableSource; +use columnq::table::parse_table_uri_arg; use roapi_http::api; use roapi_http::api::HandlerContext; use roapi_http::config::Config; +fn table_arg() -> clap::Arg<'static> { + clap::Arg::new("table") + .about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: `blogs=s3://bucket/key,format=delta`. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe. If no table_name is provided, a table name will be derived from the filename in URI.") + .takes_value(true) + .required(false) + .number_of_values(1) + .multiple(true) + .value_name("[table_name=]uri[,option_key=option_value]") + .long("table") + .short('t') +} + #[actix_web::main] async fn main() -> anyhow::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -36,15 +48,7 @@ async fn main() -> anyhow::Result<()> { .takes_value(true) .long("config") .short('c'), - clap::Arg::new("table") - .about("table sources to load") - .takes_value(true) - .required(false) - .number_of_values(1) - .multiple(true) - .value_name("table_name:uri") - .long("table") - .short('t'), + table_arg(), ]) .get_matches(); @@ -60,16 +64,7 @@ async fn main() -> anyhow::Result<()> { if let Some(tables) = matches.values_of("table") { for v in tables { - let mut split = v.splitn(2, ':'); - let table_name = split - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?; - let uri = split - .next() - .ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?; - config - .tables - .push(TableSource::new(table_name.to_string(), uri.to_string())); + config.tables.push(parse_table_uri_arg(v)?); } } diff --git a/test_data/spacex-launches.json b/test_data/spacex_launches.json similarity index 100% rename from test_data/spacex-launches.json rename to test_data/spacex_launches.json