Fallback to application/json encoding (#104)

* feat: fallback to application/json encoding

* fix: update tower-http git hash

* Update roapi-http/src/api/mod.rs

* build: clean roapi-http output in default build

* fix: tower-http cors Any -> any()

Co-authored-by: QP Hou <dave2008713@gmail.com>
This commit is contained in:
Erwin Kroon 2021-11-13 20:06:58 +01:00 committed by GitHub
parent b661dbe996
commit 56b8d0b8d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 66 additions and 123 deletions

View File

@ -35,6 +35,8 @@ jobs:
profile: default
toolchain: stable
override: true
- name: Clean roapi-http
run: cargo clean -p roapi-http
- name: Check
run: cargo clippy
- name: Build

72
Cargo.lock generated
View File

@ -2,27 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix-macros"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "actix-rt"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea360596a50aa9af459850737f99293e5cb9114ae831118cb6026b3bbc7583ad"
dependencies = [
"actix-macros",
"futures-core",
"tokio",
]
[[package]]
name = "adler"
version = "1.0.2"
@ -139,9 +118,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "axum"
version = "0.3.0"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49c3f630b925c7a85089ff794fdce495c88c80d38710f31eb9817c8399fd77ce"
checksum = "c5e874ee652f2ec443faed3073b80f0ac7a2042a3605fc0704d28bbbf22d623c"
dependencies = [
"async-trait",
"bitflags",
@ -1096,15 +1075,6 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "integer-encoding"
version = "1.1.7"
@ -1229,15 +1199,6 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
@ -1611,31 +1572,6 @@ dependencies = [
"libm",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
name = "parquet"
version = "6.0.0-SNAPSHOT"
@ -1983,7 +1919,6 @@ dependencies = [
name = "roapi-http"
version = "0.4.4"
dependencies = [
"actix-rt",
"anyhow",
"axum",
"clap",
@ -2809,7 +2744,6 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
@ -2909,7 +2843,7 @@ dependencies = [
[[package]]
name = "tower-http"
version = "0.1.1"
source = "git+https://github.com/tower-rs/tower-http.git?branch=cors#9f09fc55bf74b5b9a8bd79cbede7e73d37a3e79b"
source = "git+https://github.com/tower-rs/tower-http.git?branch=cors#2f9b5985f49f978a952c0dbc28594091c247f8ac"
dependencies = [
"bytes",
"futures-core",

View File

@ -21,7 +21,9 @@ impl TryFrom<Option<&uriparse::Scheme<'_>>> for BlobStoreType {
fn try_from(scheme: Option<&uriparse::Scheme<'_>>) -> Result<Self, Self::Error> {
match scheme {
// default to local file when schema is not provided
None | Some(uriparse::Scheme::FileSystem) | Some(uriparse::Scheme::File) => Ok(BlobStoreType::FileSystem),
None | Some(uriparse::Scheme::FileSystem) | Some(uriparse::Scheme::File) => {
Ok(BlobStoreType::FileSystem)
}
Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => Ok(BlobStoreType::Http),
Some(uriparse::Scheme::Unregistered(s)) => match s.as_str() {
"s3" => Ok(BlobStoreType::S3),
@ -40,12 +42,11 @@ impl TryFrom<Option<&uriparse::Scheme<'_>>> for BlobStoreType {
}
}
#[cfg(test)]
mod tests {
use uriparse::*;
use std::convert::TryFrom;
use crate::io::BlobStoreType;
use std::convert::TryFrom;
use uriparse::*;
#[test]
fn path_test() {
@ -74,4 +75,4 @@ mod tests {
let blob_type = BlobStoreType::try_from(uri_ref.scheme()).unwrap();
assert_eq!(blob_type, BlobStoreType::FileSystem);
}
}
}

View File

@ -47,7 +47,6 @@ simd = ["columnq/simd"]
snmalloc = ["snmalloc-rs"]
[dev-dependencies]
actix-rt = "*"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"]}
# TODO: uncomment this when we exclude roapi-http from root workspace

View File

@ -15,7 +15,7 @@ pub async fn post(
body: Bytes,
) -> Result<Response<Body>, ApiErrResp> {
let ctx = state.0;
let encode_type = encode_type_from_hdr(headers)?;
let encode_type = encode_type_from_hdr(headers);
let graphq = std::str::from_utf8(&body).map_err(ApiErrResp::read_query)?;
let batches = ctx.cq.query_graphql(graphq).await?;
encode_record_batches(encode_type, &batches)

View File

@ -1,11 +1,11 @@
use std::convert::TryFrom;
use axum::body::Body;
use axum::http;
use axum::http::header;
use axum::http::Response;
use columnq::datafusion::arrow;
use columnq::encoding;
use columnq::encoding::ContentType;
use columnq::ColumnQ;
use log::info;
@ -50,17 +50,11 @@ pub fn bytes_to_json_resp(bytes: Vec<u8>) -> Response<Body> {
bytes_to_resp(bytes, "application/json")
}
pub fn encode_type_from_hdr(
headers: header::HeaderMap,
) -> Result<encoding::ContentType, ApiErrResp> {
pub fn encode_type_from_hdr(headers: header::HeaderMap) -> encoding::ContentType {
match headers.get(header::ACCEPT) {
None => Ok(encoding::ContentType::Json),
None => encoding::ContentType::Json,
Some(hdr_value) => {
encoding::ContentType::try_from(hdr_value.as_bytes()).map_err(|_| ApiErrResp {
code: http::StatusCode::BAD_REQUEST,
error: "unsupported_content_type".to_string(),
message: format!("{:?} is not a supported response content type", hdr_value),
})
encoding::ContentType::try_from(hdr_value.as_bytes()).unwrap_or(ContentType::Json)
}
}
}

View File

@ -17,7 +17,7 @@ pub async fn get_table(
extract::Query(params): extract::Query<HashMap<String, String>>,
) -> Result<Response<Body>, ApiErrResp> {
let ctx = &state.0;
let encode_type = encode_type_from_hdr(headers)?;
let encode_type = encode_type_from_hdr(headers);
let batches = ctx.cq.query_rest_table(&table_name, &params).await?;
encode_record_batches(encode_type, &batches)
}

View File

@ -15,7 +15,7 @@ pub async fn post(
body: Bytes,
) -> Result<Response<Body>, ApiErrResp> {
let ctx = state.0;
let encode_type = encode_type_from_hdr(headers)?;
let encode_type = encode_type_from_hdr(headers);
let sql = std::str::from_utf8(&body).map_err(ApiErrResp::read_query)?;
let batches = ctx.cq.query_sql(sql).await?;
encode_record_batches(encode_type, &batches)

View File

@ -30,7 +30,7 @@ impl Application {
let routes = api::routes::register_app_routes();
let cors = tower_http::cors::CorsLayer::new()
.allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS])
.allow_origin(tower_http::cors::Any)
.allow_origin(tower_http::cors::any())
.allow_credentials(false);
let mut app = routes
.layer(axum::AddExtensionLayer::new(Arc::new(handler_ctx)))

View File

@ -6,13 +6,13 @@ use anyhow::Result;
use columnq::arrow::datatypes::Schema;
use tokio;
#[actix_rt::test]
#[tokio::test]
async fn test_schema() -> Result<()> {
let json_table = helpers::get_spacex_table();
let (app, address) = helpers::test_api_app(vec![json_table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_get(&format!("{}/api/schema", address)).await;
let response = helpers::http_get(&format!("{}/api/schema", address), None).await;
assert_eq!(response.status(), 200);
let body = response.json::<HashMap<String, Schema>>().await?;
@ -20,7 +20,7 @@ async fn test_schema() -> Result<()> {
Ok(())
}
#[actix_rt::test]
#[tokio::test]
async fn test_uk_cities_sql_post() -> Result<()> {
let table = helpers::get_uk_cities_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
@ -47,7 +47,7 @@ async fn test_uk_cities_sql_post() -> Result<()> {
Ok(())
}
#[actix_rt::test]
#[tokio::test]
async fn test_sql_invalid_post() -> Result<()> {
let table = helpers::get_uk_cities_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
@ -68,7 +68,7 @@ async fn test_sql_invalid_post() -> Result<()> {
Ok(())
}
#[actix_rt::test]
#[tokio::test]
async fn test_ubuntu_ami_sql_post() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
@ -95,43 +95,54 @@ async fn test_ubuntu_ami_sql_post() -> Result<()> {
Ok(())
}
#[actix_rt::test]
#[tokio::test]
async fn test_rest_get() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let accept_headers = vec![
None,
Some("application/json"),
Some("text/html,application/xhtml+xml,application/xml;q=0.9"),
Some("text/html,application/xhtml+xml,application/xml;q=0.9,application/json;q=0.5"),
];
let response = helpers::http_get(&format!(
"{}/api/tables/ubuntu_ami?\
for accept_header in accept_headers {
let response = helpers::http_get(
&format!(
"{}/api/tables/ubuntu_ami?\
columns=name,version,release&\
filter[arch]='amd64'&\
filter[zone]eq='us-west-2'&\
filter[instance_type]eq='hvm:ebs-ssd'&\
sort=-version,release\
",
address
))
.await;
address
),
accept_header,
)
.await;
assert_eq!(response.status(), 200);
let data = response.json::<serde_json::Value>().await?;
assert_eq!(
data,
serde_json::json!([
{ "release": "20201205", "version": "20.10", "name": "groovy" },
{ "release": "20201201", "version": "20.04 LTS", "name": "focal" },
{ "release": "20200716.1", "version": "19.10", "name": "eoan" },
{ "release": "20200115", "version": "19.04", "name": "disco" },
{ "release": "20201201", "version": "18.04 LTS", "name": "bionic" },
{ "release": "20201202.1", "version": "16.04 LTS", "name": "xenial" },
{ "release": "20191107", "version": "14.04 LTS", "name": "trusty" },
{ "release": "20170502", "version": "12.04 LTS", "name": "precise" }
])
);
assert_eq!(response.status(), 200);
let data = response.json::<serde_json::Value>().await?;
assert_eq!(
data,
serde_json::json!([
{ "release": "20201205", "version": "20.10", "name": "groovy" },
{ "release": "20201201", "version": "20.04 LTS", "name": "focal" },
{ "release": "20200716.1", "version": "19.10", "name": "eoan" },
{ "release": "20200115", "version": "19.04", "name": "disco" },
{ "release": "20201201", "version": "18.04 LTS", "name": "bionic" },
{ "release": "20201202.1", "version": "16.04 LTS", "name": "xenial" },
{ "release": "20191107", "version": "14.04 LTS", "name": "trusty" },
{ "release": "20170502", "version": "12.04 LTS", "name": "precise" }
])
);
}
Ok(())
}
#[actix_rt::test]
#[tokio::test]
async fn test_graphql_post_query_op() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
@ -177,7 +188,7 @@ async fn test_graphql_post_query_op() -> Result<()> {
Ok(())
}
#[actix_rt::test]
#[tokio::test]
async fn test_graphql_post_selection() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;

View File

@ -27,12 +27,14 @@ pub async fn test_api_app(tables: Vec<TableSource>) -> (Application, String) {
(app, address)
}
pub async fn http_get(url: &str) -> reqwest::Response {
reqwest::Client::new()
.get(url)
.send()
.await
.expect("Unable to execute GET request")
pub async fn http_get(url: &str, accept: Option<&str>) -> reqwest::Response {
let request = reqwest::Client::new().get(url);
let request = if let Some(accept) = accept {
request.header("Accept", accept)
} else {
request
};
request.send().await.expect("Unable to execute GET request")
}
pub async fn http_post(url: &str, payload: impl Into<reqwest::Body>) -> reqwest::Response {