mirror of
https://github.com/roapi/roapi.git
synced 2026-06-05 21:04:02 +08:00
ci: fix lints from new rust release (#254)
This commit is contained in:
parent
bc5754a4ed
commit
2c128c8f7f
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -969,7 +969,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "columnq-cli"
|
||||
version = "0.3.0"
|
||||
version = "0.4.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
@ -3490,7 +3490,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "roapi"
|
||||
version = "0.8.1"
|
||||
version = "0.9.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-process",
|
||||
|
||||
@ -65,7 +65,7 @@ async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
|
||||
pretty::print_batches(&batches)?;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error: {}", e);
|
||||
println!("Error: {e}");
|
||||
}
|
||||
},
|
||||
}
|
||||
@ -75,7 +75,7 @@ async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
println!("Error: {:?}", err);
|
||||
println!("Error: {err:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -143,7 +143,7 @@ async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> {
|
||||
other => anyhow::bail!("unsupported output format: {}", other),
|
||||
},
|
||||
Err(e) => {
|
||||
println!("Error: {}", e);
|
||||
println!("Error: {e}");
|
||||
}
|
||||
},
|
||||
None => {
|
||||
|
||||
@ -58,8 +58,7 @@ impl ObjectStoreProvider for ColumnQObjectStoreProvider {
|
||||
}
|
||||
},
|
||||
_ => Err(DataFusionError::Execution(format!(
|
||||
"Unsupported scheme: {}",
|
||||
url_schema
|
||||
"Unsupported scheme: {url_schema}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
@ -229,7 +228,7 @@ mod tests {
|
||||
let res = provider
|
||||
.get_by_url(&Url::from_str(host_url).unwrap());
|
||||
let msg = match res {
|
||||
Err(e) => format!("{}", e),
|
||||
Err(e) => format!("{e}"),
|
||||
Ok(_) => "".to_string(),
|
||||
};
|
||||
assert_eq!("".to_string(), msg);
|
||||
@ -258,12 +257,12 @@ mod tests {
|
||||
let tmp_gcs_path = tmp_dir.path().join("service_account.json");
|
||||
let mut tmp_gcs = File::create(tmp_gcs_path.clone())?;
|
||||
writeln!(tmp_gcs, r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"#)?;
|
||||
env::set_var("GOOGLE_SERVICE_ACCOUNT", tmp_gcs_path.clone());
|
||||
env::set_var("GOOGLE_SERVICE_ACCOUNT", tmp_gcs_path);
|
||||
|
||||
let res = provider
|
||||
.get_by_url(&Url::from_str(host_url).unwrap());
|
||||
let msg = match res {
|
||||
Err(e) => format!("{}", e),
|
||||
Err(e) => format!("{e}"),
|
||||
Ok(_) => "".to_string(),
|
||||
};
|
||||
assert_eq!("".to_string(), msg);
|
||||
@ -285,7 +284,7 @@ mod tests {
|
||||
let res = provider
|
||||
.get_by_url(&Url::from_str(host_url).unwrap());
|
||||
let msg = match res {
|
||||
Err(e) => format!("{}", e),
|
||||
Err(e) => format!("{e}"),
|
||||
Ok(_) => "".to_string(),
|
||||
};
|
||||
assert_eq!("".to_string(), msg);
|
||||
|
||||
@ -12,7 +12,7 @@ pub enum ContentType {
|
||||
}
|
||||
|
||||
impl ContentType {
|
||||
pub fn to_str<'a>(&'a self) -> &'static str {
|
||||
pub fn to_str(&self) -> &'static str {
|
||||
match self {
|
||||
ContentType::Json => "application/json",
|
||||
ContentType::Csv => "application/csv",
|
||||
|
||||
@ -87,19 +87,19 @@ pub enum ColumnQError {
|
||||
|
||||
impl ColumnQError {
|
||||
pub fn open_parquet_file(e: std::io::Error) -> Self {
|
||||
Self::LoadParquet(format!("Failed to open file: {}", e))
|
||||
Self::LoadParquet(format!("Failed to open file: {e}"))
|
||||
}
|
||||
|
||||
pub fn parquet_record_reader(e: parquet::errors::ParquetError) -> Self {
|
||||
Self::LoadParquet(format!("Failed to create record reader: {}", e))
|
||||
Self::LoadParquet(format!("Failed to create record reader: {e}"))
|
||||
}
|
||||
|
||||
pub fn parquet_file_reader(e: parquet::errors::ParquetError) -> Self {
|
||||
Self::LoadParquet(format!("Failed to create file reader: {}", e))
|
||||
Self::LoadParquet(format!("Failed to create file reader: {e}"))
|
||||
}
|
||||
|
||||
pub fn json_parse(e: serde_json::Error) -> Self {
|
||||
Self::LoadJson(format!("Failed to parse JSON data: {}", e))
|
||||
Self::LoadJson(format!("Failed to parse JSON data: {e}"))
|
||||
}
|
||||
|
||||
pub fn s3_obj_missing_key() -> Self {
|
||||
@ -133,56 +133,56 @@ impl QueryError {
|
||||
pub fn plan_sql(error: DataFusionError) -> Self {
|
||||
Self {
|
||||
error: "plan_sql".to_string(),
|
||||
message: format!("Failed to plan execution from SQL query: {}", error),
|
||||
message: format!("Failed to plan execution from SQL query: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalid_sort(error: DataFusionError) -> Self {
|
||||
Self {
|
||||
error: "invalid_sort".to_string(),
|
||||
message: format!("Failed to apply sort operator: {}", error),
|
||||
message: format!("Failed to apply sort operator: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalid_filter(error: DataFusionError) -> Self {
|
||||
Self {
|
||||
error: "invalid_filter".to_string(),
|
||||
message: format!("Failed to apply filter operator: {}", error),
|
||||
message: format!("Failed to apply filter operator: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalid_limit(error: DataFusionError) -> Self {
|
||||
Self {
|
||||
error: "invalid_limit".to_string(),
|
||||
message: format!("Failed to apply limit operator: {}", error),
|
||||
message: format!("Failed to apply limit operator: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalid_projection(error: DataFusionError) -> Self {
|
||||
Self {
|
||||
error: "invalid_projection".to_string(),
|
||||
message: format!("Failed to apply projection operator: {}", error),
|
||||
message: format!("Failed to apply projection operator: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query_exec(error: DataFusionError) -> Self {
|
||||
Self {
|
||||
error: "query_execution".to_string(),
|
||||
message: format!("Failed to execute query: {}", error),
|
||||
message: format!("Failed to execute query: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalid_table(error: DataFusionError, table_name: &str) -> Self {
|
||||
Self {
|
||||
error: "invalid_table".to_string(),
|
||||
message: format!("Failed to load table {}: {}", table_name, error),
|
||||
message: format!("Failed to load table {table_name}: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalid_kv_name(kv_name: &str) -> Self {
|
||||
Self {
|
||||
error: "invalid_kv_name".to_string(),
|
||||
message: format!("keyvalue store name `{}` doesn't exist", kv_name),
|
||||
message: format!("keyvalue store name `{kv_name}` doesn't exist"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ where
|
||||
.map(|fpath| {
|
||||
debug!("loading file from path: {}", fpath);
|
||||
let reader = fs::File::open(fpath)
|
||||
.map_err(|e| ColumnQError::FileStore(format!("open file error: {}", e)))?;
|
||||
.map_err(|e| ColumnQError::FileStore(format!("open file error: {e}")))?;
|
||||
|
||||
partition_reader(reader)
|
||||
})
|
||||
@ -43,8 +43,7 @@ where
|
||||
debug!("building file list from path {}...", fs_path);
|
||||
let files = build_file_list(&fs_path, &file_ext).map_err(|e| {
|
||||
ColumnQError::FileStore(format!(
|
||||
"Failed to build file list from path `{}`: {}",
|
||||
fs_path, e
|
||||
"Failed to build file list from path `{fs_path}`: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
|
||||
@ -16,12 +16,11 @@ where
|
||||
.map_err(|e| ColumnQError::HttpStore(e.to_string()))?;
|
||||
if resp.status().as_u16() / 100 != 2 {
|
||||
return Err(ColumnQError::HttpStore(format!(
|
||||
"Invalid response from server: {:?}",
|
||||
resp
|
||||
"Invalid response from server: {resp:?}"
|
||||
)));
|
||||
}
|
||||
let reader = std::io::Cursor::new(resp.bytes().await.map_err(|e| {
|
||||
ColumnQError::HttpStore(format!("Failed to decode server response: {}", e))
|
||||
ColumnQError::HttpStore(format!("Failed to decode server response: {e}"))
|
||||
})?);
|
||||
|
||||
// HTTP store doesn't support directory listing, so we always only return a single partition
|
||||
|
||||
@ -29,8 +29,7 @@ impl TryFrom<Option<&uriparse::Scheme<'_>>> for BlobStoreType {
|
||||
Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => Ok(BlobStoreType::Http),
|
||||
Some(uriparse::Scheme::Unregistered(s)) => BlobStoreType::try_from(s.as_str()),
|
||||
_ => Err(ColumnQError::InvalidUri(format!(
|
||||
"Unsupported scheme: {:?}",
|
||||
scheme
|
||||
"Unsupported scheme: {scheme:?}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
@ -46,8 +45,7 @@ impl TryFrom<&str> for BlobStoreType {
|
||||
"az" | "adl" | "adfs" | "adfss" | "azure" => Ok(BlobStoreType::Azure),
|
||||
"memory" => Ok(BlobStoreType::Memory),
|
||||
_ => Err(ColumnQError::InvalidUri(format!(
|
||||
"Unsupported scheme: {:?}",
|
||||
scheme
|
||||
"Unsupported scheme: {scheme:?}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,7 +26,7 @@ impl From<graphql_parser::query::ParseError> for QueryError {
|
||||
fn invalid_selection_set(error: datafusion::error::DataFusionError) -> QueryError {
|
||||
QueryError {
|
||||
error: "invalid_selection_set".to_string(),
|
||||
message: format!("failed to apply selection set for query: {}", error),
|
||||
message: format!("failed to apply selection set for query: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,8 +40,8 @@ fn invalid_query(message: String) -> QueryError {
|
||||
// convert order list from graphql argument to datafusion sort columns
|
||||
//
|
||||
// sort order matters, thus it's modeled as a list
|
||||
fn to_datafusion_sort_columns<'a, 'b>(
|
||||
sort_columns: &'a [Value<'b, &'b str>],
|
||||
fn to_datafusion_sort_columns<'b>(
|
||||
sort_columns: &[Value<'b, &'b str>],
|
||||
) -> Result<Vec<Expr>, QueryError> {
|
||||
sort_columns
|
||||
.iter()
|
||||
@ -56,8 +56,7 @@ fn to_datafusion_sort_columns<'a, 'b>(
|
||||
}
|
||||
_ => {
|
||||
return Err(invalid_query(format!(
|
||||
"field in sort option should be a string, got: {}",
|
||||
optval,
|
||||
"field in sort option should be a string, got: {optval}",
|
||||
)));
|
||||
}
|
||||
};
|
||||
@ -68,25 +67,22 @@ fn to_datafusion_sort_columns<'a, 'b>(
|
||||
"desc" => Ok(column_sort_expr_desc(col.to_string())),
|
||||
"asc" => Ok(column_sort_expr_asc(col.to_string())),
|
||||
other => Err(invalid_query(format!(
|
||||
"sort order needs to be either `desc` or `asc`, got: {}",
|
||||
other,
|
||||
"sort order needs to be either `desc` or `asc`, got: {other}",
|
||||
))),
|
||||
},
|
||||
Some(v) => Err(invalid_query(format!(
|
||||
"sort order value should to be a String, got: {}",
|
||||
v,
|
||||
"sort order value should to be a String, got: {v}",
|
||||
))),
|
||||
}
|
||||
}
|
||||
other => Err(invalid_query(format!(
|
||||
"sort condition should be defined as object, got: {}",
|
||||
other,
|
||||
"sort condition should be defined as object, got: {other}",
|
||||
))),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn operand_to_datafusion_expr<'a, 'b>(operand: &'a Value<'b, &'b str>) -> Result<Expr, QueryError> {
|
||||
fn operand_to_datafusion_expr<'b>(operand: &Value<'b, &'b str>) -> Result<Expr, QueryError> {
|
||||
match operand {
|
||||
Value::Boolean(b) => Ok(Expr::Literal(ScalarValue::Boolean(Some(*b)))),
|
||||
Value::String(s) => Ok(Expr::Literal(ScalarValue::Utf8(Some(s.to_string())))),
|
||||
@ -96,15 +92,13 @@ fn operand_to_datafusion_expr<'a, 'b>(operand: &'a Value<'b, &'b str>) -> Result
|
||||
Value::Int(n) => Ok(Expr::Literal(ScalarValue::Int64(Some(
|
||||
n.as_i64().ok_or_else(|| {
|
||||
invalid_query(format!(
|
||||
"invalid integer number in filter predicate: {}",
|
||||
operand
|
||||
"invalid integer number in filter predicate: {operand}"
|
||||
))
|
||||
})?,
|
||||
)))),
|
||||
Value::Float(f) => Ok(Expr::Literal(ScalarValue::Float64(Some(f.to_owned())))),
|
||||
other => Err(invalid_query(format!(
|
||||
"invalid operand in filter predicate: {}",
|
||||
other,
|
||||
"invalid operand in filter predicate: {other}",
|
||||
))),
|
||||
}
|
||||
}
|
||||
@ -123,9 +117,9 @@ fn operand_to_datafusion_expr<'a, 'b>(operand: &'a Value<'b, &'b str>) -> Result
|
||||
// col4
|
||||
// }
|
||||
// ```
|
||||
fn to_datafusion_predicates<'a, 'b>(
|
||||
col: &'b str,
|
||||
filter: &'a Value<'b, &'b str>,
|
||||
fn to_datafusion_predicates<'b>(
|
||||
col: &str,
|
||||
filter: &Value<'b, &'b str>,
|
||||
) -> Result<Vec<Expr>, QueryError> {
|
||||
match filter {
|
||||
Value::Object(obj) => obj
|
||||
@ -140,8 +134,7 @@ fn to_datafusion_predicates<'a, 'b>(
|
||||
"gt" => Ok(binary_expr(col_expr, Operator::Gt, right_expr)),
|
||||
"gte" | "gteq" => Ok(binary_expr(col_expr, Operator::GtEq, right_expr)),
|
||||
other => Err(invalid_query(format!(
|
||||
"invalid filter predicate operator, got: {}",
|
||||
other,
|
||||
"invalid filter predicate operator, got: {other}",
|
||||
))),
|
||||
}
|
||||
})
|
||||
@ -155,8 +148,7 @@ fn to_datafusion_predicates<'a, 'b>(
|
||||
)])
|
||||
}
|
||||
other => Err(invalid_query(format!(
|
||||
"filter predicate should be defined as object, got: {}",
|
||||
other,
|
||||
"filter predicate should be defined as object, got: {other}",
|
||||
))),
|
||||
}
|
||||
}
|
||||
@ -187,7 +179,7 @@ pub fn query_to_df(
|
||||
n => {
|
||||
return Err(QueryError {
|
||||
error: "invalid graphql query".to_string(),
|
||||
message: format!("only 1 definition allowed, got: {}", n),
|
||||
message: format!("only 1 definition allowed, got: {n}"),
|
||||
});
|
||||
}
|
||||
};
|
||||
@ -198,7 +190,7 @@ pub fn query_to_df(
|
||||
_ => {
|
||||
return Err(QueryError {
|
||||
error: "invalid graphql query".to_string(),
|
||||
message: format!("Unsupported operation: {}", def),
|
||||
message: format!("Unsupported operation: {def}"),
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -256,7 +248,7 @@ pub fn query_to_df(
|
||||
}
|
||||
"page" => page = Some(value),
|
||||
other => {
|
||||
return Err(invalid_query(format!("invalid query argument: {}", other)));
|
||||
return Err(invalid_query(format!("invalid query argument: {other}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -273,8 +265,7 @@ pub fn query_to_df(
|
||||
}
|
||||
other => {
|
||||
return Err(invalid_query(format!(
|
||||
"filter argument takes object as value, got: {}",
|
||||
other
|
||||
"filter argument takes object as value, got: {other}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
@ -307,8 +298,7 @@ pub fn query_to_df(
|
||||
}
|
||||
other => {
|
||||
return Err(invalid_query(format!(
|
||||
"sort argument takes list as value, got: {}",
|
||||
other
|
||||
"sort argument takes list as value, got: {other}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
@ -325,8 +315,7 @@ pub fn query_to_df(
|
||||
if let Value::Int(n) = value {
|
||||
n.as_i64().ok_or_else(|| {
|
||||
invalid_query(format!(
|
||||
"invalid 64bits integer number in limit argument: {}",
|
||||
value,
|
||||
"invalid 64bits integer number in limit argument: {value}",
|
||||
))
|
||||
})? - 1
|
||||
} else {
|
||||
@ -336,23 +325,21 @@ pub fn query_to_df(
|
||||
};
|
||||
let limit = n.as_i64().ok_or_else(|| {
|
||||
invalid_query(format!(
|
||||
"invalid 64bits integer number in limit argument: {}",
|
||||
value,
|
||||
"invalid 64bits integer number in limit argument: {value}",
|
||||
))
|
||||
})?;
|
||||
df = df
|
||||
.limit(
|
||||
(skip as usize) * limit as usize,
|
||||
Some(usize::try_from(limit).map_err(|_| {
|
||||
invalid_query(format!("limit value too large: {}", value))
|
||||
invalid_query(format!("limit value too large: {value}"))
|
||||
})?),
|
||||
)
|
||||
.map_err(QueryError::invalid_limit)?;
|
||||
}
|
||||
other => {
|
||||
return Err(invalid_query(format!(
|
||||
"limit argument takes int as value, got: {}",
|
||||
other,
|
||||
"limit argument takes int as value, got: {other}",
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -13,7 +13,7 @@ use crate::query::{column_sort_expr_asc, column_sort_expr_desc};
|
||||
fn err_rest_query_value(error: sqlparser::tokenizer::TokenizerError) -> QueryError {
|
||||
QueryError {
|
||||
error: "rest_query_value".to_string(),
|
||||
message: format!("invalid REST query value {:?}", error),
|
||||
message: format!("invalid REST query value {error:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,13 +36,13 @@ fn rest_query_value_to_expr(v: &str) -> Result<Expr, QueryError> {
|
||||
} else {
|
||||
Err(QueryError {
|
||||
error: "rest_query_value".to_string(),
|
||||
message: format!("invalid REST query numeric value {}", s),
|
||||
message: format!("invalid REST query numeric value {s}"),
|
||||
})
|
||||
}
|
||||
}
|
||||
_ => Err(QueryError {
|
||||
error: "rest_query_value".to_string(),
|
||||
message: format!("invalid REST query value {}", v),
|
||||
message: format!("invalid REST query value {v}"),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@ -50,7 +50,7 @@ fn rest_query_value_to_expr(v: &str) -> Result<Expr, QueryError> {
|
||||
fn num_parse_err(e: std::num::ParseIntError) -> QueryError {
|
||||
QueryError {
|
||||
error: "invalid_numeric_param".to_string(),
|
||||
message: format!("Failed to parse numeric parameter value: {}", e),
|
||||
message: format!("Failed to parse numeric parameter value: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,7 +78,7 @@ pub fn table_query_to_df(
|
||||
None => {
|
||||
return Err(QueryError {
|
||||
error: "rest_query".to_string(),
|
||||
message: format!("missing column from filter `{}`", key),
|
||||
message: format!("missing column from filter `{key}`"),
|
||||
});
|
||||
}
|
||||
};
|
||||
@ -111,7 +111,7 @@ pub fn table_query_to_df(
|
||||
None => {
|
||||
return Err(QueryError {
|
||||
error: "rest_query".to_string(),
|
||||
message: format!("invalid filter condition {}", key),
|
||||
message: format!("invalid filter condition {key}"),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,13 +84,13 @@ async fn gs_api_get(token: &str, url: &str) -> Result<reqwest::Response, ColumnQ
|
||||
Client::builder()
|
||||
.build()
|
||||
.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to initialize HTTP client: {}", e))
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to initialize HTTP client: {e}"))
|
||||
})?
|
||||
.get(url)
|
||||
.bearer_auth(token)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| ColumnQError::GoogleSpreadsheets(format!("Failed to send API request: {}", e)))
|
||||
.map_err(|e| ColumnQError::GoogleSpreadsheets(format!("Failed to send API request: {e}")))
|
||||
}
|
||||
|
||||
fn coerce_type(l: DataType, r: DataType) -> DataType {
|
||||
@ -218,8 +218,7 @@ async fn fetch_auth_token(
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!(
|
||||
"Error reading application secret from disk: {}",
|
||||
e
|
||||
"Error reading application secret from disk: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
@ -228,15 +227,14 @@ async fn fetch_auth_token(
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!(
|
||||
"Error building service account authenticator: {}",
|
||||
e
|
||||
"Error building service account authenticator: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let scopes = &["https://www.googleapis.com/auth/spreadsheets.readonly"];
|
||||
|
||||
sa.token(scopes).await.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to obtain OAuth2 token: {}", e))
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to obtain OAuth2 token: {e}"))
|
||||
})
|
||||
}
|
||||
|
||||
@ -249,18 +247,17 @@ async fn resolve_sheet_title<'a, 'b, 'c, 'd>(
|
||||
let resp = gs_api_get(
|
||||
token,
|
||||
&format!(
|
||||
"https://sheets.googleapis.com/v4/spreadsheets/{}",
|
||||
spreadsheet_id
|
||||
"https://sheets.googleapis.com/v4/spreadsheets/{spreadsheet_id}"
|
||||
),
|
||||
)
|
||||
.await?
|
||||
.error_for_status()
|
||||
.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to resolve sheet title from API: {}", e))
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to resolve sheet title from API: {e}"))
|
||||
})?;
|
||||
|
||||
let spreadsheets = resp.json::<Spreadsheets>().await.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to parse API response: {}", e))
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to parse API response: {e}"))
|
||||
})?;
|
||||
|
||||
// when sheet id is not specified from config, try to parse it from URI
|
||||
@ -285,7 +282,7 @@ async fn resolve_sheet_title<'a, 'b, 'c, 'd>(
|
||||
.sheets
|
||||
.iter()
|
||||
.find(|s| s.properties.sheet_id == id)
|
||||
.ok_or_else(|| ColumnQError::GoogleSpreadsheets(format!("Invalid sheet id {}", id)))?,
|
||||
.ok_or_else(|| ColumnQError::GoogleSpreadsheets(format!("Invalid sheet id {id}")))?,
|
||||
// no sheet id specified, default to the first sheet
|
||||
None => spreadsheets
|
||||
.sheets
|
||||
@ -329,18 +326,17 @@ pub async fn to_mem_table(
|
||||
let resp = gs_api_get(
|
||||
token_str,
|
||||
&format!(
|
||||
"https://sheets.googleapis.com/v4/spreadsheets/{}/values/{}",
|
||||
spreadsheet_id, sheet_title,
|
||||
"https://sheets.googleapis.com/v4/spreadsheets/{spreadsheet_id}/values/{sheet_title}",
|
||||
),
|
||||
)
|
||||
.await?
|
||||
.error_for_status()
|
||||
.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to load sheet value from API: {}", e))
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to load sheet value from API: {e}"))
|
||||
})?;
|
||||
|
||||
let sheet = resp.json::<SpreadsheetValues>().await.map_err(|e| {
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to parse API response: {}", e))
|
||||
ColumnQError::GoogleSpreadsheets(format!("Failed to parse API response: {e}"))
|
||||
})?;
|
||||
|
||||
let batch = sheet_values_to_record_batch(&sheet.values)?;
|
||||
|
||||
@ -26,8 +26,7 @@ fn json_partition_to_vec(
|
||||
Some(v) => value_ref = v,
|
||||
None => {
|
||||
return Err(ColumnQError::LoadJson(format!(
|
||||
"Invalid json pointer: {}",
|
||||
p
|
||||
"Invalid json pointer: {p}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
@ -55,7 +54,7 @@ fn json_vec_to_partition(
|
||||
json_rows.iter().map(|v| Ok(v.clone())),
|
||||
)
|
||||
.map_err(|e| {
|
||||
ColumnQError::LoadJson(format!("Failed to infer schema from JSON data: {}", e))
|
||||
ColumnQError::LoadJson(format!("Failed to infer schema from JSON data: {e}"))
|
||||
})?,
|
||||
};
|
||||
|
||||
@ -81,8 +80,7 @@ fn json_vec_to_partition(
|
||||
Ok(())
|
||||
}
|
||||
None => Err(arrow::error::ArrowError::JsonError(format!(
|
||||
"arry encoded JSON row missing column {:?} : {:?}",
|
||||
i, json_row
|
||||
"arry encoded JSON row missing column {i:?} : {json_row:?}"
|
||||
))),
|
||||
}
|
||||
})?;
|
||||
@ -94,7 +92,7 @@ fn json_vec_to_partition(
|
||||
};
|
||||
|
||||
while let Some(batch) = decoder.next_batch(&mut values_iter).map_err(|e| {
|
||||
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {}", e))
|
||||
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {e}"))
|
||||
})? {
|
||||
batches.push(batch);
|
||||
}
|
||||
@ -134,8 +132,7 @@ async fn to_partitions(
|
||||
match &pointer {
|
||||
Some(p) => {
|
||||
return Err(ColumnQError::LoadJson(format!(
|
||||
"{} points to an emtpy array",
|
||||
p
|
||||
"{p} points to an emtpy array"
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
|
||||
@ -269,7 +269,7 @@ impl TableLoadOption {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extension<'a>(&'a self) -> &'static str {
|
||||
pub fn extension(&self) -> &'static str {
|
||||
match self {
|
||||
Self::json { .. } => "json",
|
||||
Self::ndjson { .. } => "ndjson",
|
||||
@ -305,8 +305,7 @@ impl TableIoSource {
|
||||
match self {
|
||||
Self::Memory(data) => Ok(data),
|
||||
other => Err(ColumnQError::Generic(format!(
|
||||
"expect memory IO source, got: {:?}",
|
||||
other
|
||||
"expect memory IO source, got: {other:?}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
@ -315,7 +314,7 @@ impl TableIoSource {
|
||||
impl std::fmt::Display for TableIoSource {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
TableIoSource::Uri(uri) => write!(f, "uri({})", uri),
|
||||
TableIoSource::Uri(uri) => write!(f, "uri({uri})"),
|
||||
TableIoSource::Memory(_) => write!(f, "memory"),
|
||||
}
|
||||
}
|
||||
@ -407,26 +406,23 @@ impl TableSource {
|
||||
pub fn parsed_uri(&self) -> Result<URIReference, ColumnQError> {
|
||||
match &self.io_source {
|
||||
TableIoSource::Uri(uri) => URIReference::try_from(uri.as_str()).map_err(|_| {
|
||||
ColumnQError::InvalidUri(format!("{}. Make sure it's URI encoded.", uri))
|
||||
ColumnQError::InvalidUri(format!("{uri}. Make sure it's URI encoded."))
|
||||
}),
|
||||
TableIoSource::Memory(_) => URIReference::builder()
|
||||
.with_scheme(Some(uriparse::Scheme::try_from("memory").map_err(|e| {
|
||||
ColumnQError::Generic(format!(
|
||||
"failed to create uri scheme for memory IO source: {:?}",
|
||||
e
|
||||
"failed to create uri scheme for memory IO source: {e:?}"
|
||||
))
|
||||
})?))
|
||||
.with_path(uriparse::Path::try_from("data").map_err(|e| {
|
||||
ColumnQError::Generic(format!(
|
||||
"failed to create uri path for memory IO source: {:?}",
|
||||
e
|
||||
"failed to create uri path for memory IO source: {e:?}"
|
||||
))
|
||||
})?)
|
||||
.build()
|
||||
.map_err(|e| {
|
||||
ColumnQError::Generic(format!(
|
||||
"failed to create uri for memory IO source: {:?}",
|
||||
e
|
||||
"failed to create uri for memory IO source: {e:?}"
|
||||
))
|
||||
}),
|
||||
}
|
||||
@ -443,8 +439,7 @@ impl TableSource {
|
||||
"sqlite" | "sqlite3" | "db" => "sqlite",
|
||||
_ => {
|
||||
return Err(ColumnQError::InvalidUri(format!(
|
||||
"unsupported extension in uri: {}",
|
||||
uri
|
||||
"unsupported extension in uri: {uri}"
|
||||
)));
|
||||
}
|
||||
},
|
||||
@ -456,8 +451,7 @@ impl TableSource {
|
||||
Some(TableLoadOption::postgres {}) => "postgres",
|
||||
_ => {
|
||||
return Err(ColumnQError::InvalidUri(format!(
|
||||
"unsupported extension in uri: {}",
|
||||
uri
|
||||
"unsupported extension in uri: {uri}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
@ -473,7 +467,10 @@ impl TableSource {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn load(t: &TableSource, dfctx: &datafusion::execution::context::SessionContext) -> Result<Arc<dyn TableProvider>, ColumnQError> {
|
||||
pub async fn load(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
|
||||
if let Some(opt) = &t.option {
|
||||
Ok(match opt {
|
||||
TableLoadOption::json { .. } => Arc::new(json::to_mem_table(t).await?),
|
||||
@ -529,7 +526,7 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
|
||||
|
||||
let uri = uri_args
|
||||
.next()
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI argument: {}", uri_arg)))?;
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI argument: {uri_arg}")))?;
|
||||
let split = uri.splitn(2, '=').collect::<Vec<&str>>();
|
||||
|
||||
let (table_name, uri) = match split.len() {
|
||||
@ -539,10 +536,10 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
|
||||
Some(s) => Ok(s),
|
||||
None => Path::new(uri)
|
||||
.file_name()
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI: {}", uri))),
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI: {uri}"))),
|
||||
}?
|
||||
.to_str()
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI string: {}", uri)))?;
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table URI string: {uri}")))?;
|
||||
|
||||
(table_name, uri)
|
||||
}
|
||||
@ -553,7 +550,7 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
|
||||
let t = if uri == "stdin" {
|
||||
let mut buffer = Vec::new();
|
||||
std::io::stdin().read_to_end(&mut buffer).map_err(|e| {
|
||||
ColumnQError::Generic(format!("Failed to read table data from stdin: {:?}", e))
|
||||
ColumnQError::Generic(format!("Failed to read table data from stdin: {e:?}"))
|
||||
})?;
|
||||
TableSource::new(table_name, TableIoSource::Memory(buffer))
|
||||
} else {
|
||||
@ -566,10 +563,10 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
|
||||
let mut parts = opt_str.splitn(2, '=');
|
||||
let opt_key = parts
|
||||
.next()
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {:?}", opt_str)))?;
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {opt_str:?}")))?;
|
||||
let opt_value = parts
|
||||
.next()
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {:?}", opt_str)))?;
|
||||
.ok_or_else(|| ColumnQError::Generic(format!("invalid table option: {opt_str:?}")))?;
|
||||
option_json.insert(
|
||||
opt_key.to_string(),
|
||||
serde_json::from_str(opt_value).unwrap_or_else(|_| opt_value.into()),
|
||||
@ -579,7 +576,7 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
|
||||
if !option_json.is_empty() {
|
||||
let opt: TableLoadOption = serde_json::from_value(serde_json::Value::Object(option_json))
|
||||
.map_err(|e| {
|
||||
ColumnQError::Generic(format!("Failed to parse table option: {:?}", e))
|
||||
ColumnQError::Generic(format!("Failed to parse table option: {e:?}"))
|
||||
})?;
|
||||
Ok(t.with_option(opt))
|
||||
} else {
|
||||
|
||||
@ -11,5 +11,5 @@ pub async fn get<H: RoapiContext>(
|
||||
) -> Result<impl IntoResponse, ApiErrResp> {
|
||||
ctx.kv_get(&kv_name, &key)
|
||||
.await?
|
||||
.ok_or_else(|| ApiErrResp::not_found(format!("key {} not found", key)))
|
||||
.ok_or_else(|| ApiErrResp::not_found(format!("key {key} not found")))
|
||||
}
|
||||
|
||||
@ -120,7 +120,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
|
||||
None => Config::default(),
|
||||
Some(config_path) => {
|
||||
let config_content = fs::read_to_string(config_path)
|
||||
.with_context(|| format!("Failed to read config file: {}", config_path))?;
|
||||
.with_context(|| format!("Failed to read config file: {config_path}"))?;
|
||||
|
||||
serde_yaml::from_str(&config_content).context("Failed to parse YAML config")?
|
||||
}
|
||||
|
||||
@ -77,7 +77,7 @@ impl ApiErrResp {
|
||||
Self {
|
||||
code: http::StatusCode::BAD_REQUEST,
|
||||
error: "read_query".to_string(),
|
||||
message: format!("Failed to decode utf-8 query: {}", error),
|
||||
message: format!("Failed to decode utf-8 query: {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -74,7 +74,7 @@ pub struct PostgresServer<H: RoapiContext> {
|
||||
|
||||
impl<H: RoapiContext> PostgresServer<H> {
|
||||
pub async fn new(ctx: Arc<H>, config: &Config, default_host: String) -> Self {
|
||||
let default_addr = format!("{}:5432", default_host);
|
||||
let default_addr = format!("{default_host}:5432");
|
||||
|
||||
let addr = config
|
||||
.addr
|
||||
|
||||
@ -5,7 +5,7 @@ use std::collections::HashMap;
|
||||
use anyhow::Result;
|
||||
use async_process::Command;
|
||||
use columnq::arrow::datatypes::Schema;
|
||||
use tokio;
|
||||
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schema() -> Result<()> {
|
||||
@ -13,7 +13,7 @@ async fn test_schema() -> Result<()> {
|
||||
let (app, address) = helpers::test_api_app_with_tables(vec![json_table]).await;
|
||||
tokio::spawn(app.run_until_stopped());
|
||||
|
||||
let response = helpers::http_get(&format!("{}/api/schema", address), None).await;
|
||||
let response = helpers::http_get(&format!("{address}/api/schema"), None).await;
|
||||
|
||||
assert_eq!(response.status(), 200);
|
||||
let body = response.json::<HashMap<String, Schema>>().await?;
|
||||
@ -28,7 +28,7 @@ async fn test_uk_cities_sql_post() -> Result<()> {
|
||||
tokio::spawn(app.run_until_stopped());
|
||||
|
||||
let response = helpers::http_post(
|
||||
&format!("{}/api/sql", address),
|
||||
&format!("{address}/api/sql"),
|
||||
"SELECT city FROM uk_cities WHERE lat > 52 and lat < 53 and lng < -1",
|
||||
)
|
||||
.await;
|
||||
@ -54,7 +54,7 @@ async fn test_sql_invalid_post() -> Result<()> {
|
||||
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
|
||||
tokio::spawn(app.run_until_stopped());
|
||||
|
||||
let response = helpers::http_post(&format!("{}/api/sql", address), "SELECT city FROM").await;
|
||||
let response = helpers::http_post(&format!("{address}/api/sql"), "SELECT city FROM").await;
|
||||
|
||||
assert_eq!(response.status(), 400);
|
||||
let data = response.json::<serde_json::Value>().await?;
|
||||
@ -76,7 +76,7 @@ async fn test_ubuntu_ami_sql_post() -> Result<()> {
|
||||
tokio::spawn(app.run_until_stopped());
|
||||
|
||||
let response = helpers::http_post(
|
||||
&format!("{}/api/sql", address),
|
||||
&format!("{address}/api/sql"),
|
||||
"SELECT ami_id FROM ubuntu_ami \
|
||||
WHERE version='12.04 LTS' \
|
||||
AND arch = 'amd64' \
|
||||
@ -111,14 +111,13 @@ async fn test_rest_get() -> Result<()> {
|
||||
for accept_header in accept_headers {
|
||||
let response = helpers::http_get(
|
||||
&format!(
|
||||
"{}/api/tables/ubuntu_ami?\
|
||||
"{address}/api/tables/ubuntu_ami?\
|
||||
columns=name,version,release&\
|
||||
filter[arch]='amd64'&\
|
||||
filter[zone]eq='us-west-2'&\
|
||||
filter[instance_type]eq='hvm:ebs-ssd'&\
|
||||
sort=-version,release\
|
||||
",
|
||||
address
|
||||
"
|
||||
),
|
||||
accept_header,
|
||||
)
|
||||
@ -150,7 +149,7 @@ async fn test_graphql_post_query_op() -> Result<()> {
|
||||
tokio::spawn(app.run_until_stopped());
|
||||
|
||||
let response = helpers::http_post(
|
||||
&format!("{}/api/graphql", address),
|
||||
&format!("{address}/api/graphql"),
|
||||
r#"query {
|
||||
ubuntu_ami(
|
||||
filter: {
|
||||
@ -196,7 +195,7 @@ async fn test_graphql_post_selection() -> Result<()> {
|
||||
tokio::spawn(app.run_until_stopped());
|
||||
|
||||
let response = helpers::http_post(
|
||||
&format!("{}/api/graphql", address),
|
||||
&format!("{address}/api/graphql"),
|
||||
r#"{
|
||||
ubuntu_ami(
|
||||
filter: {
|
||||
@ -246,7 +245,7 @@ async fn test_http2() -> Result<()> {
|
||||
.arg("-s")
|
||||
.arg("-I")
|
||||
.arg("--http2-prior-knowledge")
|
||||
.arg(format!("{}/api/schema", address))
|
||||
.arg(format!("{address}/api/schema"))
|
||||
.arg("-o")
|
||||
.arg("/dev/null")
|
||||
.arg("-w")
|
||||
@ -272,8 +271,7 @@ async fn test_kvstore_get() -> Result<()> {
|
||||
|
||||
let response = helpers::http_get(
|
||||
&format!(
|
||||
"{}/api/kv/spacex_launch_name/600f9a8d8f798e2a4d5f979e",
|
||||
address
|
||||
"{address}/api/kv/spacex_launch_name/600f9a8d8f798e2a4d5f979e"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
mod helpers;
|
||||
|
||||
use anyhow::Result;
|
||||
use tokio;
|
||||
|
||||
use tokio_postgres::NoTls;
|
||||
|
||||
#[tokio::test]
|
||||
@ -18,7 +18,7 @@ async fn test_postgres_count() -> Result<()> {
|
||||
// so spawn it off to run on its own.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user