chore: bump version for axum, dirs, sqlparser, yup-auth2, (#145)

* chore: bump version for axum, dirs, sqlparser, yup-auth2,

* chore: Bump datafusion to 7 and Delta-rs to f5e4b5f94393b517d3d88c200d7169cce11b304b

* fix(test): query are now case insensitive in datafusion

* fix(test): Remove println! statements
This commit is contained in:
Tiphaine Ruy 2022-02-25 07:57:03 +01:00 committed by GitHub
parent cd271c65ac
commit 646c467916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 605 additions and 362 deletions

773
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,20 +1,6 @@
[workspace]
members = [
"columnq",
"columnq-cli",
"roapi-http",
]
[patch.crates-io]
datafusion = { git = "https://github.com/houqp/arrow-datafusion.git", rev = "50a98805cdd6d5383f0c124d449ed549ca13428a" }
# datafusion = { path = "/home/houqp/Documents/code/delta/arrow-datafusion/datafusion" }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "70f520a15ae7884c46142bfb0dbc8790f43d8701" }
[patch."https://github.com/apache/arrow-datafusion"]
datafusion = { git = "https://github.com/houqp/arrow-datafusion.git", rev = "50a98805cdd6d5383f0c124d449ed549ca13428a" }
# datafusion = { path = "/home/houqp/Documents/code/delta/arrow-datafusion/datafusion" }
members = ["columnq", "columnq-cli", "roapi-http"]
[profile.dev]
split-debuginfo = "unpacked"

View File

@ -23,7 +23,7 @@ rustyline = { version = "9" }
env_logger = { version = "0" }
anyhow = { version = "1" }
clap = { version = "3", features = ["color"] }
dirs = { version = "3" }
dirs = { version = "4" }
[features]
default = ["rustls", "snmalloc"]

View File

@ -158,15 +158,18 @@ async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> {
async fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let app = clap::App::new("Columnq")
let app = clap::Command::new("Columnq")
.version(env!("CARGO_PKG_VERSION"))
.author("QP Hou")
.about("OLAP the Unix way.")
.setting(clap::AppSettings::SubcommandRequiredElseHelp)
.arg_required_else_help(true)
.subcommand_required(true)
// .setting(clap::Command::SubcommandRequiredElseHelp)
// .setting(clap::Command::ar)
.subcommand(
clap::App::new("sql")
clap::Command::new("sql")
.about("Query tables with SQL")
.setting(clap::AppSettings::ArgRequiredElseHelp)
.arg_required_else_help(true)
.args(&[
clap::Arg::new("SQL")
.help("SQL query to execute")
@ -188,9 +191,9 @@ async fn main() -> anyhow::Result<()> {
]),
)
.subcommand(
clap::App::new("console")
clap::Command::new("console")
.about("Query tables through an interactive console")
.setting(clap::AppSettings::ArgRequiredElseHelp)
.arg_required_else_help(true)
.args(&[table_arg()]),
);
let matches = app.get_matches();

View File

@ -11,21 +11,24 @@ name = "columnq"
path = "src/lib.rs"
[dependencies]
datafusion = "6"
datafusion = "7"
log = "0"
regex = "1"
lazy_static = "1"
graphql-parser = "0"
sqlparser = "0.10"
yup-oauth2 = { version = "5.1", default-features = false }
sqlparser = "0.14"
yup-oauth2 = { version = "6.2", default-features = false }
thiserror = "1"
serde_json = "1"
serde_derive = "1"
serde = "1"
uriparse = "0"
bytes = { version = "1" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json"] }
reqwest = { version = "0.11", default-features = false, features = [
"blocking",
"json",
] }
tokio = { version = "1", features = ["rt-multi-thread"] }
futures = "0.3"
@ -36,10 +39,12 @@ rusoto_s3 = { version = "0.47", default-features = false }
rusoto_credential = { version = "0.47" }
rusoto_sts = { version = "0.47", default-features = false }
hyper-tls = { version = "0.5.0", default-features = false, optional = true }
hyper-rustls = { version = "0.22.1", default-features = false, optional = true }
hyper-rustls = { version = "0.23.0", default-features = false, optional = true }
deltalake = { version = "0", default-features = false, features = ["datafusion-ext"] }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "f5e4b5f94393b517d3d88c200d7169cce11b304b", default-features = false, features = [
"datafusion-ext",
] }
[dev-dependencies]
anyhow = "1"
@ -49,7 +54,29 @@ pretty_assertions = "*"
[features]
default = ["rustls"]
rustls = ["reqwest/rustls-tls", "rusoto_core/rustls", "rusoto_core/rustls", "rusoto_sts/rustls", "deltalake/s3-rustls", "yup-oauth2/hyper-rustls"]
native-tls-vendored = ["reqwest/native-tls-vendored", "hyper-tls/vendored", "rusoto_core/native-tls", "rusoto_s3/native-tls", "rusoto_sts/native-tls", "deltalake/s3", "yup-oauth2/hyper-tls"]
native-tls = ["reqwest/native-tls", "rusoto_core/native-tls", "rusoto_s3/native-tls", "rusoto_sts/native-tls", "deltalake/s3", "yup-oauth2/hyper-tls"]
rustls = [
"reqwest/rustls-tls",
"rusoto_core/rustls",
"rusoto_core/rustls",
"rusoto_sts/rustls",
"deltalake/s3-rustls",
"yup-oauth2/hyper-rustls",
]
native-tls-vendored = [
"reqwest/native-tls-vendored",
"hyper-tls/vendored",
"rusoto_core/native-tls",
"rusoto_s3/native-tls",
"rusoto_sts/native-tls",
"deltalake/s3",
"yup-oauth2/hyper-tls",
]
native-tls = [
"reqwest/native-tls",
"rusoto_core/native-tls",
"rusoto_s3/native-tls",
"rusoto_sts/native-tls",
"deltalake/s3",
"yup-oauth2/hyper-tls",
]
simd = ["datafusion/simd"]

View File

@ -36,6 +36,13 @@ impl ColumnQ {
&self.schema_map
}
pub fn serializable_schema_map(&self) -> HashMap<&String, &arrow::datatypes::Schema> {
self.schema_map
.iter()
.map(|(k, v)| (k, v.as_ref()))
.collect()
}
pub async fn query_graphql(
&self,
query: &str,

View File

@ -1,10 +1,12 @@
use datafusion::arrow;
use crate::error::ColumnQError;
pub fn record_batches_to_bytes(
batches: &[arrow::record_batch::RecordBatch],
) -> Result<Vec<u8>, serde_json::Error> {
let json_rows = arrow::json::writer::record_batches_to_json_rows(batches);
serde_json::to_vec(&json_rows)
) -> Result<Vec<u8>, ColumnQError> {
let json_rows = arrow::json::writer::record_batches_to_json_rows(batches)?;
serde_json::to_vec(&json_rows).map_err(ColumnQError::json_parse)
}
#[cfg(test)]

View File

@ -5,7 +5,7 @@ use datafusion::parquet::errors::ParquetError;
pub fn record_batches_to_bytes(
batches: &[arrow::record_batch::RecordBatch],
) -> Result<Vec<u8>, ParquetError> {
let cursor = parquet::util::cursor::InMemoryWriteableCursor::default();
let cursor = parquet::file::writer::InMemoryWriteableCursor::default();
{
if !batches.is_empty() {
let schema = batches[0].schema();

View File

@ -57,6 +57,12 @@ pub enum ColumnQError {
source: datafusion::error::DataFusionError,
},
#[error("Serde error: {source}")]
Serde {
#[from]
source: serde_json::Error,
},
#[error("Generic error: {0}")]
Generic(String),
}

View File

@ -35,10 +35,10 @@ mod tests {
let batches = exec_query(
&dfctx,
r#"
SELECT DISTINCT(Landlord), COUNT(Address)
SELECT DISTINCT(landlord), COUNT(address)
FROM properties
GROUP BY Landlord
ORDER BY Landlord
GROUP BY landlord
ORDER BY landlord
"#,
)
.await?;

View File

@ -75,7 +75,7 @@ mod tests {
)
.await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(37 * 3));
Ok(())
@ -87,7 +87,7 @@ mod tests {
let t = to_mem_table(&TableSource::new("uk_cities".to_string(), test_path)).await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(37));
Ok(())

View File

@ -75,7 +75,7 @@ mod tests {
)
.await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(37 * 3));
Ok(())
@ -87,7 +87,7 @@ mod tests {
let t = to_mem_table(&TableSource::new("uk_cities".to_string(), test_path)).await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(37));
Ok(())

View File

@ -56,6 +56,7 @@ pub async fn to_mem_table(
batch_size,
None,
projection.cloned(),
None,
);
csv_reader
@ -100,7 +101,7 @@ mod tests {
)
.await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(37 * 3));
Ok(())
@ -122,7 +123,7 @@ c1,c2,c3
));
let t = to_mem_table(&source).await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(3));
Ok(())

View File

@ -84,8 +84,8 @@ pub async fn to_mem_table(
let delta_schema = delta_table.get_schema()?;
let paths = delta_table.get_file_uris();
let path_iter = paths.iter().map(|s| s.as_str());
let paths = delta_table.get_file_uris().collect::<Vec<String>>();
let path_iter = paths.iter().map(|s| s.as_ref());
let partitions: Vec<Vec<RecordBatch>> = match blob_type {
io::BlobStoreType::FileSystem => io::fs::partitions_from_iterator(
@ -140,7 +140,7 @@ mod tests {
)
.await?;
validate_statistics(t.scan(&None, 1024, &[], None).await?.statistics());
validate_statistics(t.scan(&None, &[], None).await?.statistics());
match t.as_any().downcast_ref::<MemTable>() {
Some(_) => Ok(()),

View File

@ -8,7 +8,7 @@ use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::TableProvider;
use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader};
@ -39,12 +39,11 @@ pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvide
}
};
Ok(Arc::new(ListingTable::new(
df_object_store,
table_path,
file_schema,
list_opt,
)))
Ok(Arc::new(ListingTable::try_new(
ListingTableConfig::new(df_object_store, table_path)
.with_schema(file_schema)
.with_listing_options(list_opt),
)?))
}
}
@ -122,7 +121,7 @@ mod tests {
)
.await?;
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(500));
let stats = stats.column_statistics.unwrap();
assert_eq!(stats[0].null_count, Some(245));
@ -152,7 +151,7 @@ mod tests {
Some("protobuf")
);
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(500));
Ok(())
@ -182,7 +181,7 @@ mod tests {
Some("protobuf")
);
let stats = t.scan(&None, 1024, &[], None).await?.statistics();
let stats = t.scan(&None, &[], None).await?.statistics();
assert_eq!(stats.num_rows, Some(1500));
Ok(())

View File

@ -19,13 +19,13 @@ pub fn test_data_path(relative_path: &str) -> String {
fn properties_table() -> anyhow::Result<MemTable> {
let schema = Arc::new(Schema::new(vec![
Field::new("Address", DataType::Utf8, false),
Field::new("Landlord", DataType::Utf8, false),
Field::new("Bed", DataType::Int64, false),
Field::new("Bath", DataType::Int64, false),
Field::new("Occupied", DataType::Boolean, false),
Field::new("Monthly_Rent", DataType::Utf8, false),
Field::new("Lease_Expiration_Date", DataType::Utf8, false),
Field::new("address", DataType::Utf8, false),
Field::new("landlord", DataType::Utf8, false),
Field::new("bed", DataType::Int64, false),
Field::new("bath", DataType::Int64, false),
Field::new("occupied", DataType::Boolean, false),
Field::new("monthly_rent", DataType::Utf8, false),
Field::new("lease_expiration_date", DataType::Utf8, false),
]));
let record_batch = RecordBatch::try_new(

View File

@ -22,7 +22,7 @@ snmalloc-rs = { version = "0.2", optional = true }
# dependencies related to axum
tokio = { version = "1", features = ["rt-multi-thread"] }
hyper = { version = "0", features = ["http1", "server", "stream", "runtime"] }
axum = { version = "0.3", features = ["default", "http2"] }
axum = { version = "0.4", features = ["default", "http2"] }
tower-http = { version = "0", features = ["cors"] }
tower-layer = "0"
tracing = "0"
@ -47,7 +47,10 @@ simd = ["columnq/simd"]
snmalloc = ["snmalloc-rs"]
[dev-dependencies]
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"]}
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls",
] }
async-process = "1.3.0"
# TODO: uncomment this when we exclude roapi-http from root workspace

View File

@ -8,8 +8,10 @@ pub async fn schema(
state: extract::Extension<Arc<HandlerContext>>,
) -> Result<impl IntoResponse, ApiErrResp> {
let ctx = state.0;
let payload =
serde_json::to_vec(ctx.cq.schema_map()).map_err(ApiErrResp::json_serialization)?;
let schema = ctx.cq.serializable_schema_map();
let payload = serde_json::to_vec(&schema)
.map_err(columnq::error::ColumnQError::from)
.map_err(ApiErrResp::json_serialization)?;
Ok(bytes_to_json_resp(payload))
}
@ -20,10 +22,11 @@ pub async fn get_by_table_name(
let ctx = state.0;
let payload = serde_json::to_vec(
ctx.cq
.schema_map()
.serializable_schema_map()
.get(&table_name)
.ok_or_else(|| ApiErrResp::not_found("invalid table name"))?,
)
.map_err(columnq::error::ColumnQError::from)
.map_err(ApiErrResp::json_serialization)?;
Ok(bytes_to_json_resp(payload))
}

View File

@ -44,13 +44,13 @@ fn config_arg() -> clap::Arg<'static> {
}
pub fn get_configuration() -> Result<Config, anyhow::Error> {
let matches = clap::App::new("roapi-http")
let matches = clap::Command::new("roapi-http")
.version(env!("CARGO_PKG_VERSION"))
.author("QP Hou")
.about(
"Create full-fledged APIs for static datasets without writing a single line of code.",
)
.setting(clap::AppSettings::ArgRequiredElseHelp)
.arg_required_else_help(true)
.args(&[address_arg(), config_arg(), table_arg()])
.get_matches();

View File

@ -32,7 +32,7 @@ impl ApiErrResp {
}
}
pub fn json_serialization(_: serde_json::Error) -> Self {
pub fn json_serialization(_: columnq::error::ColumnQError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,
error: "json_serialization".to_string(),
@ -108,13 +108,10 @@ impl fmt::Display for ApiErrResp {
}
impl axum::response::IntoResponse for ApiErrResp {
type Body = axum::body::Body;
type BodyError = <Self::Body as axum::body::HttpBody>::Error;
fn into_response(self) -> axum::response::Response {
let payload = serde_json::to_string(&self).unwrap();
let body = axum::body::boxed(axum::body::Full::from(payload));
fn into_response(self) -> Response<axum::body::Body> {
let payload = serde_json::to_vec(&self).unwrap();
let mut res = Response::new(axum::body::Body::from(payload));
*res.status_mut() = self.code;
res
Response::builder().status(self.code).body(body).unwrap()
}
}

View File

@ -33,7 +33,7 @@ impl Application {
let routes = api::routes::register_app_routes();
let cors = tower_http::cors::CorsLayer::new()
.allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS])
.allow_origin(tower_http::cors::any())
.allow_origin(tower_http::cors::Any)
.allow_credentials(false);
let mut app = routes
.layer(axum::AddExtensionLayer::new(Arc::new(handler_ctx)))