support deriving table name from table uri

This commit is contained in:
Qingping Hou 2021-07-22 23:08:32 -07:00 committed by QP Hou
parent e5989d1f9d
commit 95f9ced23d
10 changed files with 130 additions and 88 deletions

5
Cargo.lock generated
View File

@ -562,7 +562,7 @@ dependencies = [
[[package]]
name = "columnq"
version = "0.3.1"
version = "0.3.3"
dependencies = [
"anyhow",
"arrow",
@ -605,6 +605,7 @@ dependencies = [
"dirs 3.0.2",
"env_logger",
"log",
"pretty_assertions",
"rustyline",
"serde_json",
"tokio",
@ -2239,7 +2240,7 @@ dependencies = [
[[package]]
name = "roapi-http"
version = "0.3.1"
version = "0.3.3"
dependencies = [
"actix-cors",
"actix-http",

View File

@ -50,20 +50,20 @@ cargo install --locked --git https://github.com/roapi/roapi --branch main --bin
### Quick start
Spin up APIs for `test_data/uk_cities_with_headers.csv` and
`test_data/spacex-launches.json`:
`test_data/spacex_launches.json`:
```bash
roapi-http \
--table 'uk_cities:test_data/uk_cities_with_headers.csv' \
--table 'spacex_launches:test_data/spacex-launches.json'
--table 'uk_cities=test_data/uk_cities_with_headers.csv' \
--table 'test_data/spacex_launches.json'
```
Or using docker:
```bash
docker run -t --rm -p 8080:8080 ghcr.io/roapi/roapi-http:latest --addr 0.0.0.0:8080 \
--table 'uk_cities:test_data/uk_cities_with_headers.csv' \
--table 'spacex_launches:test_data/spacex-launches.json'
--table 'uk_cities=test_data/uk_cities_with_headers.csv' \
--table 'test_data/spacex_launches.json'
```
Query tables using SQL, GraphQL or REST:

View File

@ -20,3 +20,6 @@ env_logger = { version = "0" }
anyhow = { version = "1" }
clap = { version = ">=3.0.0-beta.2,<4", features = ["color"] }
dirs = { version = "3" }
[dev-dependencies]
pretty_assertions = "*"

View File

@ -25,6 +25,6 @@ The Columnq CLI can also be used as a handy utility to Convert tabular data
between various formats: `json`, `parquet`, `csv`, `yaml`, `arrow`, etc.
```
columnq sql --table 't:test_data/uk_cities_with_headers.csv' 'SELECT * FROM t' --output json
cat test_data/blogs.parquet | columnq sql --table 't:stdin,format=parquet' 'SELECT * FROM t' --output json
$ columnq sql --table 't=test_data/uk_cities_with_headers.csv' 'SELECT * FROM t' --output json
$ cat test_data/blogs.parquet | columnq sql --table 't=stdin,format=parquet' 'SELECT * FROM t' --output json
```

View File

@ -3,10 +3,9 @@ use arrow::util::pretty;
use log::debug;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::io::Read;
use std::path::PathBuf;
use columnq::table::{TableIoSource, TableLoadOption, TableSource};
use columnq::table::parse_table_uri_arg;
use columnq::{encoding, ColumnQ, ExecutionConfig};
fn config_path() -> anyhow::Result<PathBuf> {
@ -21,63 +20,16 @@ fn config_path() -> anyhow::Result<PathBuf> {
fn table_arg() -> clap::Arg<'static> {
clap::Arg::new("table")
.about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: blogs:s3://bucket/key,format=delta. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe.")
.about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: `blogs=s3://bucket/key,format=delta`. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe. If no table_name is provided, a table name will be derived from the filename in URI.")
.takes_value(true)
.required(false)
.number_of_values(1)
.multiple(true)
.value_name("table_name:uri[,option_key=option_value]")
.value_name("[table_name=]uri[,option_key=option_value]")
.long("table")
.short('t')
}
fn parse_table_uri_arg(uri_arg: &str) -> anyhow::Result<TableSource> {
let mut split = uri_arg.splitn(2, ':');
let table_name = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?;
let uri = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?;
// separate uri from table load options
let mut uri_parts = uri.split(',');
let uri = uri_parts
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table URI: {}", uri))?;
let t = if uri == "stdin" {
let mut buffer = Vec::new();
std::io::stdin().read_to_end(&mut buffer)?;
TableSource::new(table_name, TableIoSource::Memory(buffer))
} else {
TableSource::new(table_name, uri.to_string())
};
// parse extra options from table uri
let mut option_json = serde_json::map::Map::new();
for opt_str in uri_parts.into_iter() {
let mut parts = opt_str.splitn(2, '=');
let opt_key = parts
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?;
let opt_value = parts
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?;
option_json.insert(
opt_key.to_string(),
serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()),
);
}
if option_json.len() > 0 {
let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json))?;
Ok(t.with_option(opt))
} else {
Ok(t)
}
}
async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
let mut path = config_path()?;
if !path.as_path().exists() {

View File

@ -1,6 +1,6 @@
[package]
name = "columnq"
version = "0.3.2"
version = "0.3.3"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
edition = "2018"

View File

@ -1,5 +1,6 @@
use std::convert::TryFrom;
use std::ffi::OsStr;
use std::io::Read;
use std::path::Path;
use serde::de::{Deserialize, Deserializer};
@ -15,7 +16,7 @@ pub mod json;
pub mod ndjson;
pub mod parquet;
#[derive(Deserialize, Clone, Debug)]
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct TableColumn {
pub name: String,
@ -30,7 +31,7 @@ impl From<&TableColumn> for arrow::datatypes::Field {
}
}
#[derive(Deserialize, Clone, Debug)]
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct TableSchema {
pub columns: Vec<TableColumn>,
@ -47,14 +48,14 @@ impl From<&TableSchema> for arrow::datatypes::Schema {
}
}
#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct TableOptionGoogleSpreasheet {
application_secret_path: String,
sheet_title: Option<String>,
}
/// CSV table load option
#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct TableOptionCsv {
#[serde(default = "TableOptionCsv::default_has_header")]
has_header: bool,
@ -124,7 +125,7 @@ impl Default for TableOptionCsv {
// * update load
#[allow(non_camel_case_types)]
#[derive(Deserialize, Clone, Debug)]
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(tag = "format")]
#[serde(deny_unknown_fields)]
pub enum TableLoadOption {
@ -202,7 +203,7 @@ impl std::fmt::Display for TableIoSource {
}
}
#[derive(Deserialize, Clone, Debug)]
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct TableSource {
pub name: String,
@ -332,9 +333,75 @@ pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, C
Ok(t)
}
/// For parsing table URI arg in CLI
pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
// separate uri from table load options
let mut uri_args = uri_arg.split(',');
let uri = uri_args
.next()
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI argument: {}", uri_arg)))?;
let split = uri.splitn(2, '=').collect::<Vec<&str>>();
let (table_name, uri) = match split.len() {
1 => {
let uri = split[0];
let table_name = match Path::new(uri).file_stem() {
Some(s) => Ok(s),
None => Path::new(uri)
.file_name()
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI: {}", uri))),
}?
.to_str()
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI string: {}", uri)))?;
(table_name, uri)
}
2 => (split[0], split[1]),
_ => unreachable!(),
};
let t = if uri == "stdin" {
let mut buffer = Vec::new();
std::io::stdin().read_to_end(&mut buffer).map_err(|e| {
ColumnQError::Generic(format!("Failed to read table data from stdin: {:?}", e))
})?;
TableSource::new(table_name, TableIoSource::Memory(buffer))
} else {
TableSource::new(table_name, uri.to_string())
};
// parse extra options from table uri
let mut option_json = serde_json::map::Map::new();
for opt_str in uri_args.into_iter() {
let mut parts = opt_str.splitn(2, '=');
let opt_key = parts
.next()
.ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {:?}", opt_str)))?;
let opt_value = parts
.next()
.ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {:?}", opt_str)))?;
option_json.insert(
opt_key.to_string(),
serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()),
);
}
if !option_json.is_empty() {
let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json))
.map_err(|e| {
ColumnQError::Generic(format!("Failed to parse table option: {:?}", e))
})?;
Ok(t.with_option(opt))
} else {
Ok(t)
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn uri_deserialization() -> anyhow::Result<()> {
@ -362,4 +429,28 @@ schema:
Ok(())
}
#[test]
fn test_parse_table_uri() {
let t = parse_table_uri_arg("t=a/b/c").unwrap();
assert_eq!(TableSource::new("t", "a/b/c"), t);
let t = parse_table_uri_arg("t=s3://a/b/c,format=csv,has_header=true").unwrap();
assert_eq!(
TableSource::new("t", "s3://a/b/c").with_option(TableLoadOption::csv(
TableOptionCsv::default().with_has_header(true),
)),
t
);
let t = parse_table_uri_arg("s3://a/b/foo.csv").unwrap();
assert_eq!(TableSource::new("foo", "s3://a/b/foo.csv"), t);
let t = parse_table_uri_arg("s3://a/b/foo.csv,format=csv").unwrap();
assert_eq!(
TableSource::new("foo", "s3://a/b/foo.csv")
.with_option(TableLoadOption::csv(TableOptionCsv::default())),
t
);
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "roapi-http"
version = "0.3.2"
version = "0.3.3"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
edition = "2018"

View File

@ -5,12 +5,24 @@ use std::fs;
use actix_cors::Cors;
use actix_web::{middleware, web, App, HttpServer};
use anyhow::Context;
use columnq::table::TableSource;
use columnq::table::parse_table_uri_arg;
use roapi_http::api;
use roapi_http::api::HandlerContext;
use roapi_http::config::Config;
fn table_arg() -> clap::Arg<'static> {
clap::Arg::new("table")
.about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: `blogs=s3://bucket/key,format=delta`. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe. If no table_name is provided, a table name will be derived from the filename in URI.")
.takes_value(true)
.required(false)
.number_of_values(1)
.multiple(true)
.value_name("[table_name=]uri[,option_key=option_value]")
.long("table")
.short('t')
}
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
@ -36,15 +48,7 @@ async fn main() -> anyhow::Result<()> {
.takes_value(true)
.long("config")
.short('c'),
clap::Arg::new("table")
.about("table sources to load")
.takes_value(true)
.required(false)
.number_of_values(1)
.multiple(true)
.value_name("table_name:uri")
.long("table")
.short('t'),
table_arg(),
])
.get_matches();
@ -60,16 +64,7 @@ async fn main() -> anyhow::Result<()> {
if let Some(tables) = matches.values_of("table") {
for v in tables {
let mut split = v.splitn(2, ':');
let table_name = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?;
let uri = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?;
config
.tables
.push(TableSource::new(table_name.to_string(), uri.to_string()));
config.tables.push(parse_table_uri_arg(v)?);
}
}