apply and enforce rust-fmt

This commit is contained in:
Qingping Hou 2023-02-05 17:41:43 -08:00 committed by QP Hou
parent f9b17888cb
commit c91e9cbe57
9 changed files with 61 additions and 69 deletions

View File

@ -37,6 +37,8 @@ jobs:
override: true
- name: Check
run: cargo clippy
- name: Format
run: cargo fmt --check
- name: Build
run: cargo build
- name: Run tests

View File

@ -1,32 +1,35 @@
use std::convert::TryFrom;
use crate::io::BlobStoreType;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::array::as_string_array;
use datafusion::arrow::array::StringArray;
use datafusion::datasource::object_store::{ObjectStoreRegistry, ObjectStoreProvider};
use datafusion::datasource::object_store::{ObjectStoreProvider, ObjectStoreRegistry};
use datafusion::error::{DataFusionError, Result as DatafusionResult};
pub use datafusion::execution::context::SessionConfig;
use datafusion::execution::context::SessionContext;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::collect;
use object_store::aws::AmazonS3Builder;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::azure::MicrosoftAzureBuilder;
use crate::error::{ColumnQError, QueryError};
use crate::query;
use crate::table::{self, KeyValueSource, TableSource};
use object_store::aws::AmazonS3Builder;
use object_store::azure::MicrosoftAzureBuilder;
use object_store::gcp::GoogleCloudStorageBuilder;
use url::Url;
pub struct ColumnQObjectStoreProvider {}
impl ObjectStoreProvider for ColumnQObjectStoreProvider {
fn get_by_url(&self, url: &Url) -> DatafusionResult<Arc<dyn object_store::ObjectStore>> {
match url.host_str() {
None => Err(DataFusionError::Execution(format!("Missing bucket name: {}", url.as_str()))),
None => Err(DataFusionError::Execution(format!(
"Missing bucket name: {}",
url.as_str()
))),
Some(host) => {
let url_schema = url.scheme();
match BlobStoreType::try_from(url_schema) {
@ -41,28 +44,28 @@ impl ObjectStoreProvider for ColumnQObjectStoreProvider {
Ok(s3) => Ok(Arc::new(s3)),
Err(err) => Err(DataFusionError::External(Box::new(err))),
}
},
}
BlobStoreType::GCS => {
let gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(host);
let gcs_builder =
GoogleCloudStorageBuilder::from_env().with_bucket_name(host);
match gcs_builder.build() {
Ok(gcs) => Ok(Arc::new(gcs)),
Err(err) => Err(DataFusionError::External(Box::new(err))),
}
},
}
BlobStoreType::Azure => {
let azure_builder = MicrosoftAzureBuilder::from_env().with_container_name(host);
let azure_builder =
MicrosoftAzureBuilder::from_env().with_container_name(host);
match azure_builder.build() {
Ok(azure) => Ok(Arc::new(azure)),
Err(err) => Err(DataFusionError::External(Box::new(err))),
}
},
}
_ => Err(DataFusionError::Execution(format!(
"Unsupported scheme: {url_schema}"
))),
}
},
}
}
}
}
@ -81,8 +84,10 @@ impl ColumnQ {
pub fn new_with_config(config: SessionConfig) -> Self {
let object_store_provider = ColumnQObjectStoreProvider {};
let object_store_registry = ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
let rn_config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
let object_store_registry =
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
let rn_config =
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
let runtime_env = RuntimeEnv::new(rn_config).unwrap();
let dfctx = SessionContext::with_config_rt(config, Arc::new(runtime_env));
@ -204,10 +209,10 @@ impl Default for ColumnQ {
#[cfg(test)]
mod tests {
use tempfile::Builder;
use std::fs::File;
use std::io::Write;
use std::{env, str::FromStr};
use std::fs::File;
use tempfile::Builder;
use datafusion::datasource::object_store::ObjectStoreProvider;
use url::Url;
@ -225,8 +230,7 @@ mod tests {
assert!(err.to_string().contains("Generic S3 error: Missing region"));
env::set_var("AWS_REGION", "us-east-1");
let res = provider
.get_by_url(&Url::from_str(host_url).unwrap());
let res = provider.get_by_url(&Url::from_str(host_url).unwrap());
let msg = match res {
Err(e) => format!("{e}"),
Ok(_) => "".to_string(),
@ -251,16 +255,16 @@ mod tests {
let host_url = "gs://bucket_name/path";
let provider = ColumnQObjectStoreProvider {};
let tmp_dir = Builder::new()
.prefix("columnq.test.gcs")
.tempdir()?;
let tmp_dir = Builder::new().prefix("columnq.test.gcs").tempdir()?;
let tmp_gcs_path = tmp_dir.path().join("service_account.json");
let mut tmp_gcs = File::create(tmp_gcs_path.clone())?;
writeln!(tmp_gcs, r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"#)?;
writeln!(
tmp_gcs,
r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"#
)?;
env::set_var("GOOGLE_SERVICE_ACCOUNT", tmp_gcs_path);
let res = provider
.get_by_url(&Url::from_str(host_url).unwrap());
let res = provider.get_by_url(&Url::from_str(host_url).unwrap());
let msg = match res {
Err(e) => format!("{e}"),
Ok(_) => "".to_string(),
@ -281,8 +285,7 @@ mod tests {
env::set_var("AZURE_STORAGE_ACCOUNT_NAME", "devstoreaccount1");
env::set_var("AZURE_STORAGE_ACCOUNT_KEY", "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==");
let res = provider
.get_by_url(&Url::from_str(host_url).unwrap());
let res = provider.get_by_url(&Url::from_str(host_url).unwrap());
let msg = match res {
Err(e) => format!("{e}"),
Ok(_) => "".to_string(),

View File

@ -19,9 +19,10 @@ where
"Invalid response from server: {resp:?}"
)));
}
let reader = std::io::Cursor::new(resp.bytes().await.map_err(|e| {
ColumnQError::HttpStore(format!("Failed to decode server response: {e}"))
})?);
let reader =
std::io::Cursor::new(resp.bytes().await.map_err(|e| {
ColumnQError::HttpStore(format!("Failed to decode server response: {e}"))
})?);
// HTTP store doesn't support directory listing, so we always only return a single partition
Ok(vec![partition_reader(reader)?])

View File

@ -1,25 +1,21 @@
use futures::TryStreamExt;
use std::str::FromStr;
use percent_encoding;
use url::Url;
use crate::table::TableSource;
use uriparse::URIReference;
use datafusion::datasource::object_store::ObjectStoreProvider;
use std::sync::Arc;
use crate::error::ColumnQError;
use object_store::ObjectStore;
use crate::columnq::ColumnQObjectStoreProvider;
use crate::error::ColumnQError;
use crate::table::TableSource;
use datafusion::datasource::object_store::ObjectStoreProvider;
use futures::TryStreamExt;
use object_store::ObjectStore;
use percent_encoding;
use std::str::FromStr;
use std::sync::Arc;
use uriparse::URIReference;
use url::Url;
pub async fn partition_key_to_reader(
client: Arc<dyn ObjectStore>,
path: &object_store::path::Path,
) -> Result<std::io::Cursor<Vec<u8>>, ColumnQError> {
let get_result = client
.get(path)
.await?;
let bytes = get_result
.bytes()
.await?;
let get_result = client.get(path).await?;
let bytes = get_result.bytes().await?;
Ok(std::io::Cursor::new(bytes.to_vec()))
}
@ -70,7 +66,8 @@ where
}
Err(_) => {
// fallback to directory listing
let paths = client.clone()
let paths = client
.clone()
.list(Some(&path))
.await?
.map_ok(|meta| meta.location)

View File

@ -37,9 +37,7 @@ pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvide
.with_schema(schemaref);
Ok(Arc::new(ListingTable::try_new(table_config)?))
}
pub async fn to_mem_table(
t: &TableSource,
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
pub async fn to_mem_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()

View File

@ -246,9 +246,7 @@ async fn resolve_sheet_title<'a, 'b, 'c, 'd>(
// look up sheet title by sheet id through API
let resp = gs_api_get(
token,
&format!(
"https://sheets.googleapis.com/v4/spreadsheets/{spreadsheet_id}"
),
&format!("https://sheets.googleapis.com/v4/spreadsheets/{spreadsheet_id}"),
)
.await?
.error_for_status()

View File

@ -24,11 +24,7 @@ fn json_partition_to_vec(
if let Some(p) = pointer {
match value_ref.pointer(p) {
Some(v) => value_ref = v,
None => {
return Err(ColumnQError::LoadJson(format!(
"Invalid json pointer: {p}"
)))
}
None => return Err(ColumnQError::LoadJson(format!("Invalid json pointer: {p}"))),
}
}

View File

@ -16,8 +16,10 @@ use datafusion::datasource::TableProvider;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderOptions;
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
pub async fn to_datafusion_table(t: &TableSource, dfctx: &datafusion::execution::context::SessionContext) -> Result<Arc<dyn TableProvider>, ColumnQError> {
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
@ -31,9 +33,7 @@ pub async fn to_datafusion_table(t: &TableSource, dfctx: &datafusion::execution:
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schemaref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
options.infer_schema(&dfctx.state(), &table_url).await?
}
None => options.infer_schema(&dfctx.state(), &table_url).await?,
};
let table_config = ListingTableConfig::new(table_url)
@ -115,7 +115,7 @@ mod tests {
.with_option(TableLoadOption::parquet(TableOptionParquet {
use_memory_table: false,
})),
&ctx
&ctx,
)
.await
.unwrap();

View File

@ -6,7 +6,6 @@ use anyhow::Result;
use async_process::Command;
use columnq::arrow::datatypes::Schema;
#[tokio::test]
async fn test_schema() -> Result<()> {
let json_table = helpers::get_spacex_table();
@ -270,9 +269,7 @@ async fn test_kvstore_get() -> Result<()> {
tokio::spawn(app.run_until_stopped());
let response = helpers::http_get(
&format!(
"{address}/api/kv/spacex_launch_name/600f9a8d8f798e2a4d5f979e"
),
&format!("{address}/api/kv/spacex_launch_name/600f9a8d8f798e2a4d5f979e"),
None,
)
.await;