diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bf6ad31..2268bdc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -150,6 +150,7 @@ jobs: $HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin $HOME/minio-binaries/mc mb local/test-data $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data + $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/ - name: Start roapi and Query env: AWS_DEFAULT_REGION: "us-east-1" @@ -203,6 +204,7 @@ jobs: $HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin $HOME/minio-binaries/mc mb local/test-data $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data + $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/ - name: Start roapi and Query env: AWS_DEFAULT_REGION: "us-east-1" diff --git a/columnq/src/error.rs b/columnq/src/error.rs index a948050..47076a0 100644 --- a/columnq/src/error.rs +++ b/columnq/src/error.rs @@ -42,6 +42,12 @@ pub enum ColumnQError { #[error("Error loading data from S3 store: {0}")] S3Store(String), + #[error("DataFusion error: {source}")] + ObjectStore { + #[from] + source: object_store::Error, + }, + #[error("DeltaTable error: {source}")] DeltaTable { #[from] diff --git a/columnq/src/io/mod.rs b/columnq/src/io/mod.rs index 90679b5..080f96e 100644 --- a/columnq/src/io/mod.rs +++ b/columnq/src/io/mod.rs @@ -4,6 +4,7 @@ pub mod fs; pub mod http; pub mod memory; pub mod s3; +pub mod object_store; use crate::error::ColumnQError; diff --git a/columnq/src/io/object_store.rs b/columnq/src/io/object_store.rs new file mode 100644 index 0000000..0b16b28 --- /dev/null +++ b/columnq/src/io/object_store.rs @@ -0,0 +1,62 @@ +use futures::TryStreamExt; +use std::str::FromStr; +use url::Url; +use crate::table::TableSource; +use uriparse::URIReference; +use datafusion::datasource::object_store::ObjectStoreProvider; +use std::sync::Arc; +use crate::error::ColumnQError; +use object_store::ObjectStore; +use crate::columnq::ColumnQObjectStoreProvider; + +pub async fn partition_key_to_reader( + client: Arc, + path: &object_store::path::Path, +) -> Result>, ColumnQError> { + let get_result = client + .get(path) + .await?; + let bytes = get_result + .bytes() + .await?; + Ok(std::io::Cursor::new(bytes.to_vec())) +} + +pub async fn partitions_from_uri<'a, F, T>( + t: &'a TableSource, + _uri: URIReference<'a>, + mut partition_reader: F, +) -> Result, ColumnQError> +where + F: FnMut(std::io::Cursor>) -> Result, +{ + let object_store_provider = ColumnQObjectStoreProvider {}; + let url = &Url::from_str(t.get_uri_str()).unwrap(); + let client = object_store_provider.get_by_url(url)?; + let mut partitions = vec![]; + + // first try loading table uri as single object + // url.path starts with "/", but object_store does not expect "/" at the beginning + let path = object_store::path::Path::from(&url.path()[1..]); + match partition_key_to_reader(client.clone(), &path).await { + Ok(reader) => { + partitions.push(partition_reader(reader)?); + } + Err(_) => { + // fallback to directory listing + let paths = client.clone() + .list(Some(&path)) + .await? + .map_ok(|meta| meta.location) + .try_collect::>() + .await + .unwrap(); + for f in paths { + let reader = partition_key_to_reader(client.clone(), &f).await?; + partitions.push(partition_reader(reader)?); + } + } + } + + Ok(partitions) +} diff --git a/columnq/src/lib.rs b/columnq/src/lib.rs index 3da9e2c..c1ac0e4 100644 --- a/columnq/src/lib.rs +++ b/columnq/src/lib.rs @@ -19,7 +19,7 @@ macro_rules! partitions_from_table_source { io::http::partitions_from_uri(&$table_source, uri, $call_with_r).await } io::BlobStoreType::S3 => { - io::s3::partitions_from_uri(&$table_source, uri, $call_with_r).await + io::object_store::partitions_from_uri(&$table_source, uri, $call_with_r).await } io::BlobStoreType::Memory => { io::memory::partitions_from_memory(&$table_source, $call_with_r).await diff --git a/test_end_to_end/query_blogs.sh b/test_end_to_end/query_blogs.sh index f626c94..13fc8c9 100755 --- a/test_end_to_end/query_blogs.sh +++ b/test_end_to_end/query_blogs.sh @@ -8,5 +8,17 @@ then exit 1 else echo "success" - exit 0 fi + +http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from blogs_dir" 127.0.0.1:8000/api/sql) + +echo $http_status +if [[ $http_status != "200" ]] +then + echo "error" + exit 1 +else + echo "success" +fi + +exit 0 diff --git a/test_end_to_end/test_s3_direct.yml b/test_end_to_end/test_s3_direct.yml index 3f19c7d..e3601e6 100644 --- a/test_end_to_end/test_s3_direct.yml +++ b/test_end_to_end/test_s3_direct.yml @@ -10,3 +10,9 @@ tables: format: "parquet" # following line is different from test_end_to_end/test_s3_memory.yml use_memory_table: false + - name: "blogs_dir" + uri: "s3://test-data/blogs/" + option: + format: "parquet" + # following line is different from test_end_to_end/test_s3_memory.yml + use_memory_table: false diff --git a/test_end_to_end/test_s3_memory.yml b/test_end_to_end/test_s3_memory.yml index 9748fba..e9ff187 100644 --- a/test_end_to_end/test_s3_memory.yml +++ b/test_end_to_end/test_s3_memory.yml @@ -9,3 +9,8 @@ tables: option: format: "parquet" use_memory_table: true + - name: "blogs_dir" + uri: "s3://test-data/blogs/" + option: + format: "parquet" + use_memory_table: true