lots of goodies

* add in memory io source support
* add sql command to columnq cli
* support passing table format as columnq cli option
This commit is contained in:
Qingping Hou 2021-07-17 22:11:48 -07:00 committed by QP Hou
parent c2af31882b
commit d5359b4e56
22 changed files with 516 additions and 169 deletions

View File

@ -49,7 +49,7 @@ jobs:
with:
profile: default
# toolchain: nightly
toolchain: nightly-2021-03-24
toolchain: nightly-2021-07-17
override: true
- name: Run tests
run: cargo test --features simd

3
Cargo.lock generated
View File

@ -605,6 +605,7 @@ dependencies = [
"env_logger",
"log",
"rustyline",
"serde_json",
"tokio",
]
@ -740,7 +741,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow-datafusion.git?rev=e97b86a8bc410983a73b5802ae44eb7a55faecd3#e97b86a8bc410983a73b5802ae44eb7a55faecd3"
source = "git+https://github.com/houqp/arrow-datafusion.git?rev=d5427d8dd1fd8bd6fb2541c1c9386802da7783de#d5427d8dd1fd8bd6fb2541c1c9386802da7783de"
dependencies = [
"ahash",
"arrow",

View File

@ -216,8 +216,8 @@ Query layer:
- [x] REST API GET
- [x] GraphQL
- [x] SQL
- [ ] join between tables
- [ ] support filter on nested struct fields
- [x] join between tables
- [ ] support query on nested struct fields
- [ ] index
- protocol
- [ ] gRPC

View File

@ -11,6 +11,7 @@ path = "src/main.rs"
[dependencies]
columnq = { path = "../columnq" }
serde_json = "*"
log = "0"
arrow = "4"
tokio = "1"

22
columnq-cli/README.md Normal file
View File

@ -0,0 +1,22 @@
Columnq
=======
Easy to use library and CLI to help you query tabular data with support for a
rich set of growing formats data sources.
Usage
-----
Show schemas for a specific table:
```
columnq(sql)> show columns from uk_cities;
+---------------+--------------+------------+-------------+-----------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
+---------------+--------------+------------+-------------+-----------+-------------+
| datafusion | public | uk_cities | city | Utf8 | NO |
| datafusion | public | uk_cities | lat | Float64 | NO |
| datafusion | public | uk_cities | lng | Float64 | NO |
+---------------+--------------+------------+-------------+-----------+-------------+
```

View File

@ -3,9 +3,11 @@ use arrow::util::pretty;
use log::debug;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use serde_json;
use std::io::Read;
use std::path::PathBuf;
use columnq::table::TableSource;
use columnq::table::{TableIoSource, TableLoadOption, TableSource};
use columnq::{ColumnQ, ExecutionConfig};
fn config_path() -> anyhow::Result<PathBuf> {
@ -18,6 +20,61 @@ fn config_path() -> anyhow::Result<PathBuf> {
Ok(home)
}
fn table_arg() -> clap::Arg<'static> {
clap::Arg::new("table")
.about("Table sources to load. Table option can be provided as optional setting as part of the table URI, for example: blogs:s3://bucket/key,format=delta. Set table uri to `stdin` if you want to consume table data from stdin as part of a UNIX pipe.")
.takes_value(true)
.required(false)
.number_of_values(1)
.multiple(true)
.value_name("table_name:uri[,option_key=option_value]")
.long("table")
.short('t')
}
fn parse_table_uri_arg(uri_arg: &str) -> anyhow::Result<TableSource> {
let mut split = uri_arg.splitn(2, ':');
let table_name = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?;
let uri = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", uri_arg))?;
// separate uri from table load options
let mut uri_parts = uri.split(',');
let uri = uri_parts
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table URI: {}", uri))?;
let t = if uri == "stdin" {
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
TableSource::new(table_name, TableIoSource::Memory(buffer.into_bytes()))
} else {
TableSource::new(table_name, uri.to_string())
};
// parse extra options from table uri
let mut option_json = serde_json::map::Map::new();
for opt_str in uri_parts.into_iter() {
let mut parts = opt_str.splitn(2, '=');
let opt_key = parts
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?;
let opt_value = parts
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table option: {:?}", opt_str))?;
option_json.insert(
opt_key.to_string(),
serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()),
);
}
let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json))?;
Ok(t.with_option(opt))
}
async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
let mut path = config_path()?;
if !path.as_path().exists() {
@ -72,22 +129,41 @@ async fn cmd_console(args: &clap::ArgMatches) -> anyhow::Result<()> {
if let Some(tables) = args.values_of("table") {
for v in tables {
let mut split = v.splitn(2, ':');
let table_name = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?;
let uri = split
.next()
.ok_or_else(|| anyhow::anyhow!("invalid table config: {}", v))?;
let t = TableSource::new(table_name.to_string(), uri.to_string());
cq.load_table(&t).await?;
cq.load_table(&parse_table_uri_arg(v)?).await?;
}
}
console_loop(&cq).await
}
async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> {
let config = ExecutionConfig::default().with_information_schema(true);
let mut cq = ColumnQ::new_with_config(config);
if let Some(tables) = args.values_of("table") {
for v in tables {
cq.load_table(&parse_table_uri_arg(v)?).await?;
}
}
match args.value_of("SQL") {
Some(query) => match cq.query_sql(&query).await {
Ok(batches) => match args.value_of("output").unwrap_or("table") {
"table" => pretty::print_batches(&batches)?,
other => anyhow::bail!("unsupported output format: {}", other),
},
Err(e) => {
println!("Error: {}", e);
}
},
None => {
unreachable!();
}
}
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
@ -98,23 +174,40 @@ async fn main() -> anyhow::Result<()> {
.setting(clap::AppSettings::SubcommandRequiredElseHelp)
.setting(clap::AppSettings::VersionlessSubcommands)
.subcommand(
clap::App::new("console")
.about("dump table metadata info")
clap::App::new("sql")
.about("Query tables with SQL")
.setting(clap::AppSettings::ArgRequiredElseHelp)
.args(&[clap::Arg::new("table")
.about("table sources to load")
.takes_value(true)
.required(false)
.number_of_values(1)
.multiple(true)
.value_name("table_name:uri")
.long("table")
.short('t')]),
.args(&[
clap::Arg::new("SQL")
.about("SQL query to execute")
.index(1)
.required(true)
.takes_value(true)
.number_of_values(1),
clap::Arg::new("output")
.about("Query output format")
.long("output")
.short('o')
.required(false)
.takes_value(true)
.number_of_values(1)
.default_value("table")
// TODO: add yaml
.possible_values(&["table", "json", "csv"]),
table_arg(),
]),
)
.subcommand(
clap::App::new("console")
.about("Query tables through an interactive console")
.setting(clap::AppSettings::ArgRequiredElseHelp)
.args(&[table_arg()]),
);
let matches = app.get_matches();
match matches.subcommand() {
Some(("console", console_matches)) => cmd_console(console_matches).await?,
Some(("sql", console_matches)) => cmd_sql(console_matches).await?,
_ => unreachable!(),
}

View File

@ -11,7 +11,7 @@ path = "src/lib.rs"
[dependencies]
arrow = "4"
parquet = "4"
datafusion = { git = "https://github.com/houqp/arrow-datafusion.git", rev = "e97b86a8bc410983a73b5802ae44eb7a55faecd3" }
datafusion = { git = "https://github.com/houqp/arrow-datafusion.git", rev = "d5427d8dd1fd8bd6fb2541c1c9386802da7783de" }
log = "0"
regex = "1"

View File

@ -55,6 +55,9 @@ pub enum ColumnQError {
#[from]
source: datafusion::error::DataFusionError,
},
#[error("Generic error: {0}")]
Generic(String),
}
impl ColumnQError {

View File

@ -11,7 +11,7 @@ pub async fn partitions_from_uri<'a, F, T>(
where
F: FnMut(std::io::Cursor<bytes::Bytes>) -> Result<T, ColumnQError>,
{
let resp = reqwest::get(&t.uri)
let resp = reqwest::get(t.get_uri_str())
.await
.map_err(|e| ColumnQError::HttpStore(e.to_string()))?;
if resp.status().as_u16() / 100 != 2 {

15
columnq/src/io/memory.rs Normal file
View File

@ -0,0 +1,15 @@
use crate::error::ColumnQError;
use crate::table::TableSource;
pub async fn partitions_from_memory<'a, F, T>(
t: &'a TableSource,
mut partition_reader: F,
) -> Result<Vec<T>, ColumnQError>
where
F: FnMut(std::io::Cursor<&'a [u8]>) -> Result<T, ColumnQError>,
{
let reader = std::io::Cursor::new(t.io_source.as_memory()?);
// There is no concept of directory listing for in memory data, so we always only return a
// single partition
Ok(vec![partition_reader(reader)?])
}

View File

@ -2,14 +2,17 @@ use std::convert::TryFrom;
pub mod fs;
pub mod http;
pub mod memory;
pub mod s3;
use crate::error::ColumnQError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlobStoreType {
Http,
S3,
FileSystem,
Memory,
}
impl TryFrom<Option<&uriparse::Scheme<'_>>> for BlobStoreType {
@ -22,6 +25,7 @@ impl TryFrom<Option<&uriparse::Scheme<'_>>> for BlobStoreType {
Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => Ok(BlobStoreType::Http),
Some(uriparse::Scheme::Unregistered(s)) => match s.as_str() {
"s3" => Ok(BlobStoreType::S3),
"memory" => Ok(BlobStoreType::Memory),
_ => Err(ColumnQError::InvalidUri(format!(
"Unsupported scheme: {:?}",
scheme

View File

@ -202,7 +202,8 @@ where
F: FnMut(std::io::Cursor<Vec<u8>>) -> Result<T, ColumnQError>,
{
let client = rusoto_s3::S3Client::new(rusoto_core::Region::default());
let (bucket, key) = parse_uri(&t.uri)?;
// TODO: use host and path from URIReference instead
let (bucket, key) = parse_uri(&t.get_uri_str())?;
let mut partitions = vec![];

View File

@ -21,6 +21,9 @@ macro_rules! partitions_from_table_source {
io::BlobStoreType::S3 => {
io::s3::partitions_from_uri(&$table_source, uri, $call_with_r).await
}
io::BlobStoreType::Memory => {
io::memory::partitions_from_memory(&$table_source, $call_with_r).await
}
}
}};
}

View File

@ -5,16 +5,22 @@ use arrow::record_batch::RecordBatch;
use log::debug;
use crate::error::ColumnQError;
use crate::table::TableSource;
use crate::table::{TableLoadOption, TableOptionCsv, TableSource};
pub async fn to_mem_table(
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
// TODO: read csv option from config
let has_header = true;
let delimiter = b',';
let opt = t
.option
.clone()
.unwrap_or_else(|| TableLoadOption::csv(TableOptionCsv::default()));
let opt = opt.as_csv()?;
let has_header = opt.has_header;
let delimiter = opt.delimiter;
let projection = opt.projection.as_ref();
let batch_size = 1024;
let projection = None;
debug!("inferring csv table schema...");
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
@ -48,7 +54,7 @@ pub async fn to_mem_table(
Some(delimiter),
batch_size,
None,
projection.clone(),
projection.cloned(),
);
csv_reader
@ -70,7 +76,7 @@ mod tests {
use datafusion::datasource::TableProvider;
use crate::table::TableLoadOption;
use crate::table::{TableIoSource, TableLoadOption};
use crate::test_util::*;
#[tokio::test]
@ -83,16 +89,40 @@ mod tests {
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.csv"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.csv"))? > 0);
let t = to_mem_table(&TableSource {
name: "uk_cities".to_string(),
uri: tmp_dir_path.to_string_lossy().to_string(),
schema: None,
option: Some(TableLoadOption::csv {}),
})
let t = to_mem_table(
&TableSource::new(
"uk_cities".to_string(),
tmp_dir_path.to_string_lossy().to_string(),
)
.with_option(TableLoadOption::csv(
TableOptionCsv::default().with_has_header(true),
)),
)
.await?;
assert_eq!(t.statistics().num_rows, Some(37 * 3));
Ok(())
}
#[tokio::test]
async fn load_from_memory() -> anyhow::Result<()> {
let csv_content = r#"
c1,c2,c3
1,"hello",true
2,"world",true
100,"!",false
"#
.to_string();
let source = TableSource::new("test", TableIoSource::Memory(csv_content.into_bytes()))
.with_option(TableLoadOption::csv(
TableOptionCsv::default().with_has_header(true),
));
let t = to_mem_table(&source).await?;
assert_eq!(t.statistics().num_rows, Some(3));
Ok(())
}
}

View File

@ -36,7 +36,8 @@ pub async fn to_mem_table(
// TODO: make batch size configurable
let batch_size = 1024;
let delta_table = deltalake::open_table(&t.uri).await?;
let uri_str = t.get_uri_str();
let delta_table = deltalake::open_table(uri_str).await?;
if delta_table.get_files().is_empty() {
return Err(ColumnQError::LoadDelta("empty delta table".to_string()));
@ -67,8 +68,8 @@ pub async fn to_mem_table(
}
_ => {
return Err(ColumnQError::InvalidUri(format!(
"Scheme in table uri not supported for delta table: {:?}",
t.uri
"Scheme in table uri not supported for delta table: {}",
uri_str,
)));
}
};

View File

@ -307,18 +307,19 @@ pub async fn to_mem_table(
static ref RE_GOOGLE_SHEET: Regex =
Regex::new(r"https://docs.google.com/spreadsheets/d/(.+)").unwrap();
}
if RE_GOOGLE_SHEET.captures(&t.uri).is_none() {
return Err(ColumnQError::InvalidUri(t.uri.to_string()));
let uri_str = t.get_uri_str();
if RE_GOOGLE_SHEET.captures(uri_str).is_none() {
return Err(ColumnQError::InvalidUri(uri_str.to_string()));
}
let uri = URIReference::try_from(t.uri.as_str())?;
let uri = URIReference::try_from(uri_str)?;
let spreadsheet_id = uri.path().segments()[2].as_str();
let opt = t
.option
.as_ref()
.ok_or(ColumnQError::MissingOption)?
.as_google_spreadsheet_opt()?;
.as_google_spreadsheet()?;
let token = fetch_auth_token(&opt).await?;
let token_str = token.as_str();

View File

@ -175,12 +175,10 @@ mod tests {
#[tokio::test]
async fn nested_struct_and_lists() -> Result<(), ColumnQError> {
let t = to_mem_table(&TableSource {
name: "spacex_launches".to_string(),
uri: test_data_path("spacex-launches.json"),
schema: None,
option: None,
})
let t = to_mem_table(&TableSource::new(
"spacex_launches".to_string(),
test_data_path("spacex-launches.json"),
))
.await?;
let schema = t.schema();

View File

@ -2,6 +2,7 @@ use std::convert::TryFrom;
use std::ffi::OsStr;
use std::path::Path;
use serde::de::{Deserialize, Deserializer};
use serde_derive::Deserialize;
use uriparse::URIReference;
@ -52,6 +53,70 @@ pub struct TableOptionGoogleSpreasheet {
sheet_title: Option<String>,
}
/// CSV table load option
#[derive(Deserialize, Debug, Clone)]
pub struct TableOptionCsv {
#[serde(default = "TableOptionCsv::default_has_header")]
has_header: bool,
#[serde(default = "TableOptionCsv::default_delimiter")]
#[serde(deserialize_with = "TableOptionCsv::deserialize_delimiter")]
delimiter: u8,
#[serde(default = "TableOptionCsv::default_projection")]
projection: Option<Vec<usize>>,
}
impl TableOptionCsv {
#[inline]
pub fn default_has_header() -> bool {
true
}
#[inline]
pub fn default_delimiter() -> u8 {
b','
}
#[inline]
pub fn default_projection() -> Option<Vec<usize>> {
None
}
#[inline]
pub fn with_delimiter(mut self, d: u8) -> Self {
self.delimiter = d;
self
}
#[inline]
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self
}
fn deserialize_delimiter<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: Deserializer<'de>,
{
let buf = String::deserialize(deserializer)?;
match buf.len() {
1 => Ok(buf.into_bytes()[0]),
_ => Err(serde::de::Error::custom(
"CSV delimiter should be a single character",
)),
}
}
}
impl Default for TableOptionCsv {
fn default() -> Self {
Self {
has_header: Self::default_has_header(),
delimiter: Self::default_delimiter(),
projection: Self::default_projection(),
}
}
}
// Adding new table format:
// * update TableLoadOption enum to add the new variant
// * update TableLoadOption.extension
@ -68,7 +133,7 @@ pub enum TableLoadOption {
pointer: Option<String>,
array_encoded: Option<bool>,
},
csv {},
csv(TableOptionCsv),
ndjson {},
parquet {},
google_spreadsheet(TableOptionGoogleSpreasheet),
@ -76,22 +141,63 @@ pub enum TableLoadOption {
}
impl TableLoadOption {
fn as_google_spreadsheet_opt(&self) -> Result<&TableOptionGoogleSpreasheet, ColumnQError> {
fn as_google_spreadsheet(&self) -> Result<&TableOptionGoogleSpreasheet, ColumnQError> {
match self {
TableLoadOption::google_spreadsheet(opt) => Ok(&opt),
Self::google_spreadsheet(opt) => Ok(&opt),
_ => Err(ColumnQError::ExpectFormatOption(
"google_spreadsheets".to_string(),
)),
}
}
fn as_csv(&self) -> Result<&TableOptionCsv, ColumnQError> {
match self {
Self::csv(opt) => Ok(&opt),
_ => Err(ColumnQError::ExpectFormatOption("csv".to_string())),
}
}
pub fn extension<'a>(&'a self) -> &'static str {
match self {
TableLoadOption::json { .. } => "json",
TableLoadOption::ndjson { .. } => "ndjson",
TableLoadOption::csv { .. } => "csv",
TableLoadOption::parquet { .. } => "parquet",
TableLoadOption::google_spreadsheet(_) | TableLoadOption::delta { .. } => "",
Self::json { .. } => "json",
Self::ndjson { .. } => "ndjson",
Self::csv { .. } => "csv",
Self::parquet { .. } => "parquet",
Self::google_spreadsheet(_) | Self::delta { .. } => "",
}
}
}
#[derive(Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum TableIoSource {
Uri(String),
Memory(Vec<u8>),
}
impl<T: Into<String>> From<T> for TableIoSource {
fn from(s: T) -> Self {
Self::Uri(s.into())
}
}
impl TableIoSource {
pub fn as_memory(&self) -> Result<&[u8], ColumnQError> {
match self {
Self::Memory(data) => Ok(&data),
other => Err(ColumnQError::Generic(format!(
"expect memory IO source, got: {:?}",
other
))),
}
}
}
impl std::fmt::Display for TableIoSource {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
TableIoSource::Uri(uri) => write!(f, "uri({})", uri),
TableIoSource::Memory(_) => write!(f, "memory"),
}
}
}
@ -100,38 +206,81 @@ impl TableLoadOption {
#[serde(deny_unknown_fields)]
pub struct TableSource {
pub name: String,
pub uri: String,
#[serde(flatten)]
pub io_source: TableIoSource,
pub schema: Option<TableSchema>,
pub option: Option<TableLoadOption>,
}
impl TableSource {
pub fn new(name: String, uri: String) -> Self {
// TODO: parse table format from uri during initializeion?
pub fn new(name: impl Into<String>, source: impl Into<TableIoSource>) -> Self {
Self {
name,
uri,
name: name.into(),
io_source: source.into(),
schema: None,
option: None,
}
}
pub fn new_with_uri(name: impl Into<String>, uri: impl Into<String>) -> Self {
Self::new(name, uri.into())
}
pub fn with_option(mut self, option: impl Into<TableLoadOption>) -> Self {
self.option = Some(option.into());
self
}
pub fn with_schema(mut self, schema: impl Into<TableSchema>) -> Self {
self.schema = Some(schema.into());
self
}
pub fn get_uri_str(&self) -> &str {
match &self.io_source {
TableIoSource::Uri(uri) => uri.as_str(),
TableIoSource::Memory(_) => "memory",
}
}
pub fn parsed_uri(&self) -> Result<URIReference, ColumnQError> {
URIReference::try_from(self.uri.as_str())
.map_err(|_| ColumnQError::InvalidUri(self.uri.clone()))
match &self.io_source {
TableIoSource::Uri(uri) => URIReference::try_from(uri.as_str())
.map_err(|_| ColumnQError::InvalidUri(uri.to_string())),
TableIoSource::Memory(_) => URIReference::builder()
.with_scheme(Some(uriparse::Scheme::try_from("memory").map_err(|e| {
ColumnQError::Generic(format!(
"failed to create uri scheme for memory IO source: {:?}",
e
))
})?))
.with_path(uriparse::Path::try_from("data").map_err(|e| {
ColumnQError::Generic(format!(
"failed to create uri path for memory IO source: {:?}",
e
))
})?)
.build()
.map_err(|e| {
ColumnQError::Generic(format!(
"failed to create uri for memory IO source: {:?}",
e
))
}),
}
}
pub fn extension(&self) -> Result<&str, ColumnQError> {
Ok(match &self.option {
Some(opt) => opt.extension(),
None => {
let ext = Path::new(&self.uri)
Ok(match (&self.option, &self.io_source) {
(Some(opt), _) => opt.extension(),
(None, TableIoSource::Uri(uri)) => {
let ext = Path::new(uri)
.extension()
.and_then(OsStr::to_str)
.ok_or_else(|| {
ColumnQError::InvalidUri(format!(
"cannot detect table extension from uri: {}",
self.uri
uri
))
})?;
@ -140,11 +289,16 @@ impl TableSource {
_ => {
return Err(ColumnQError::InvalidUri(format!(
"unsupported extension in uri: {}",
self.uri
uri
)));
}
}
}
(None, TableIoSource::Memory(_)) => {
return Err(ColumnQError::Generic(
"cannot detect table extension from memory IO source, please specify a format option".to_string()
));
}
})
}
}
@ -162,24 +316,50 @@ pub async fn load(t: &TableSource) -> Result<datafusion::datasource::MemTable, C
}
// no format specified explictly, try to guess from file path
Ok(
match Path::new(&t.uri).extension().and_then(OsStr::to_str) {
Some("csv") => csv::to_mem_table(t).await?,
Some("json") => json::to_mem_table(t).await?,
Some("ndjson") => ndjson::to_mem_table(t).await?,
Some("parquet") => parquet::to_mem_table(t).await?,
Some(ext) => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
t.uri, t.name, ext,
)));
}
None => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, cannot detect table format",
t.uri, t.name
)));
}
},
)
let t = match t.extension()? {
"csv" => csv::to_mem_table(t).await?,
"json" => json::to_mem_table(t).await?,
"ndjson" => ndjson::to_mem_table(t).await?,
"parquet" => parquet::to_mem_table(t).await?,
ext => {
return Err(ColumnQError::InvalidUri(format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
t.io_source, t.name, ext,
)));
}
};
Ok(t)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn uri_deserialization() -> anyhow::Result<()> {
let table_source: TableSource = serde_yaml::from_str(
r#"
name: "ubuntu_ami"
uri: "test_data/ubuntu-ami.json"
option:
format: "json"
pointer: "/aaData"
array_encoded: true
schema:
columns:
- name: "zone"
data_type: "Utf8"
- name: "name"
data_type: "Utf8"
"#,
)?;
assert_eq!(
table_source.io_source,
TableIoSource::Uri("test_data/ubuntu-ami.json".to_string())
);
Ok(())
}
}

View File

@ -77,12 +77,10 @@ mod tests {
#[tokio::test]
async fn load_simple_parquet() -> Result<(), ColumnQError> {
let t = to_mem_table(&TableSource {
name: "blogs".to_string(),
uri: test_data_path("blogs.parquet"),
schema: None,
option: None,
})
let t = to_mem_table(&TableSource::new(
"blogs".to_string(),
test_data_path("blogs.parquet"),
))
.await?;
let schema = t.schema();
@ -109,12 +107,10 @@ mod tests {
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-02.parquet"))? > 0);
assert!(fs::copy(&source_path, tmp_dir_path.join("2020-01-03.parquet"))? > 0);
let t = to_mem_table(&TableSource {
name: "blogs".to_string(),
uri: tmp_dir_path.to_string_lossy().to_string(),
schema: None,
option: Some(TableLoadOption::parquet {}),
})
let t = to_mem_table(
&TableSource::new_with_uri("blogs", tmp_dir_path.to_string_lossy())
.with_option(TableLoadOption::parquet {}),
)
.await?;
assert_eq!(

View File

@ -119,7 +119,8 @@ schema:
"#,
)?;
table_source.uri = test_data_path("ubuntu-ami.json");
// patch uri path with the correct test data path
table_source.io_source = table::TableIoSource::Uri(test_data_path("ubuntu-ami.json"));
Ok(table::load(&table_source).await?)
}

View File

@ -22,9 +22,9 @@ impl HandlerContext {
}
for t in config.tables.iter() {
info!("loading `{}` as table `{}`", t.uri, t.name);
info!("loading `{}` as table `{}`", t.io_source, t.name);
cq.load_table(t).await?;
info!("registered `{}` as table `{}`", t.uri, t.name);
info!("registered `{}` as table `{}`", t.io_source, t.name);
}
Ok(Self { cq })

View File

@ -44,12 +44,10 @@ mod tests {
web::Data::new(
HandlerContext::new(&Config {
addr: None,
tables: vec![TableSource {
name: "uk_cities".to_string(),
uri: test_data_path("uk_cities_with_headers.csv"),
schema: None,
option: None,
}],
tables: vec![TableSource::new(
"uk_cities",
test_data_path("uk_cities_with_headers.csv"),
)],
})
.await
.unwrap(),
@ -60,58 +58,57 @@ mod tests {
web::Data::new(
HandlerContext::new(&Config {
addr: None,
tables: vec![TableSource {
name: "ubuntu_ami".to_string(),
uri: test_data_path("ubuntu-ami.json"),
option: Some(TableLoadOption::json {
pointer: Some("/aaData".to_string()),
array_encoded: Some(true),
}),
schema: Some(TableSchema {
columns: vec![
TableColumn {
name: "zone".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "name".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "version".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "arch".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "instance_type".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "release".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "ami_id".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "aki_id".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
],
}),
}],
tables: vec![
TableSource::new("ubuntu_ami", test_data_path("ubuntu-ami.json"))
.with_option(TableLoadOption::json {
pointer: Some("/aaData".to_string()),
array_encoded: Some(true),
})
.with_schema(TableSchema {
columns: vec![
TableColumn {
name: "zone".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "name".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "version".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "arch".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "instance_type".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "release".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "ami_id".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
TableColumn {
name: "aki_id".to_string(),
data_type: arrow::datatypes::DataType::Utf8,
nullable: true,
},
],
}),
],
})
.await
.unwrap(),