diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2268bdc..6d5cac5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,7 +40,8 @@ jobs: - name: Build run: cargo build - name: Run tests - run: cargo test + run: | + cargo test - name: Trim cache run: | which cargo-cache || cargo install cargo-cache @@ -72,7 +73,8 @@ jobs: toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }} override: true - name: Run tests - run: cargo test --features simd + run: | + cargo test --features simd - name: Trim cache run: | which cargo-cache || cargo install cargo-cache @@ -107,13 +109,14 @@ jobs: - name: Build run: cargo build --features database - name: Run tests - run: cargo test --features database + run: | + cargo test --features database - name: Trim cache run: | which cargo-cache || cargo install cargo-cache cargo cache trim -l 1G - s3_memory_test: + object_store_memory_test: runs-on: ubuntu-latest env: RUSTFLAGS: "-C target-cpu=skylake" @@ -138,36 +141,45 @@ jobs: # toolchain: nightly toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }} override: true - - name: Install minio server + - name: Install minio server (S3) run: | docker run -d -p 9000:9000 quay.io/minio/minio server /data - - name: Install minio client + - name: Install minio client (S3) run: | curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o $HOME/minio-binaries/mc chmod +x $HOME/minio-binaries/mc - - name: Create bucket and copy test data + - name: Create bucket and copy test data (S3) run: | $HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin $HOME/minio-binaries/mc mb local/test-data $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/ + - name: Install fake gcs server (GCS) + run: | + docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http + echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "/tmp/gcs.json" + - name: Create bucket and copy test data (GCS) + run: | + curl -XPOST --data-binary '{"name":"test-data"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" + curl -XPOST --data-binary '@test_data/blogs.parquet' "http://localhost:4443/storage/v1/b/test-data/o?uploadType=media&name=blogs.parquet" - name: Start roapi and Query env: AWS_DEFAULT_REGION: "us-east-1" AWS_ACCESS_KEY_ID: minioadmin AWS_SECRET_ACCESS_KEY: minioadmin AWS_ENDPOINT_URL: http://127.0.0.1:9000 + GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" uses: BerniWittmann/background-server-action@v1.0.4 with: command: /home/runner/work/roapi/roapi/test_end_to_end/query_blogs.sh build: cargo build - start: cargo run --bin roapi -- -c test_end_to_end/test_s3_memory.yml + start: cargo run --bin roapi -- -c test_end_to_end/test_object_store_memory.yml wait-on: "http://127.0.0.1:8000/api/schema" # By default, wait-on will retry for 60 seconds. You can pass a custom timeout in seconds using wait-on-timeout. # 10 minutes = 600 seconds wait-on-timeout: 600 - s3_direct_test: + object_store_direct_test: runs-on: ubuntu-latest env: RUSTFLAGS: "-C target-cpu=skylake" @@ -192,30 +204,39 @@ jobs: # toolchain: nightly toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }} override: true - - name: Install minio server + - name: Install minio server (S3) run: | docker run -d -p 9000:9000 quay.io/minio/minio server /data - - name: Install minio client + - name: Install minio client (S3) run: | curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o $HOME/minio-binaries/mc chmod +x $HOME/minio-binaries/mc - - name: Create bucket and copy test data + - name: Create bucket and copy test data (S3) run: | $HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin $HOME/minio-binaries/mc mb local/test-data $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data $HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/ + - name: Install fake gcs server (GCS) + run: | + docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http + echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "/tmp/gcs.json" + - name: Create bucket and copy test data (GCS) + run: | + curl -XPOST --data-binary '{"name":"test-data"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" + curl -XPOST --data-binary '@test_data/blogs.parquet' "http://localhost:4443/storage/v1/b/test-data/o?uploadType=media&name=blogs.parquet" - name: Start roapi and Query env: AWS_DEFAULT_REGION: "us-east-1" AWS_ACCESS_KEY_ID: minioadmin AWS_SECRET_ACCESS_KEY: minioadmin AWS_ENDPOINT_URL: http://127.0.0.1:9000 + GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" uses: BerniWittmann/background-server-action@v1.0.4 with: command: /home/runner/work/roapi/roapi/test_end_to_end/query_blogs.sh build: cargo build - start: cargo run --bin roapi -- -c test_end_to_end/test_s3_direct.yml + start: cargo run --bin roapi -- -c test_end_to_end/test_object_store_direct.yml wait-on: "http://127.0.0.1:8000/api/schema" # By default, wait-on will retry for 60 seconds. You can pass a custom timeout in seconds using wait-on-timeout. # 10 minutes = 600 seconds diff --git a/Cargo.lock b/Cargo.lock index ba1616e..60e4367 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2811,6 +2811,7 @@ dependencies = [ "rand", "reqwest", "ring", + "rustls-pemfile 1.0.1", "serde", "serde_json", "snafu", diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 18450a7..0be6712 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -16,7 +16,7 @@ path = "src/lib.rs" arrow-schema = { version ="26", features = ["serde"] } datafusion = "14" -object_store = { version = "0.5.3", features = ["aws_profile"] } +object_store = { version = "0.5.3", features = ["aws_profile", "gcp"] } url = "2.2" log = "0" diff --git a/columnq/src/columnq.rs b/columnq/src/columnq.rs index 193b198..7d91e5f 100644 --- a/columnq/src/columnq.rs +++ b/columnq/src/columnq.rs @@ -13,6 +13,7 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::collect; use object_store::aws::AmazonS3Builder; +use object_store::gcp::GoogleCloudStorageBuilder; use crate::error::{ColumnQError, QueryError}; use crate::query; use crate::table::{self, KeyValueSource, TableSource}; @@ -34,6 +35,15 @@ impl ObjectStoreProvider for ColumnQObjectStoreProvider { Err(err) => Err(DataFusionError::External(Box::new(err))), } }, + "gs" => { + let host = url.host_str().unwrap(); + let gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(host); + match gcs_builder.build() { + Ok(gcs) => Ok(Arc::new(gcs)), + Err(err) => Err(DataFusionError::External(Box::new(err))), + } + + }, _ => Err(DataFusionError::Execution(format!( "Unsupported object store scheme {}", url_schema @@ -178,7 +188,10 @@ impl Default for ColumnQ { #[cfg(test)] mod tests { + use tempfile::Builder; + use std::io::Write; use std::{env, str::FromStr}; + use std::fs::File; use datafusion::datasource::object_store::ObjectStoreProvider; use url::Url; @@ -206,16 +219,31 @@ mod tests { env::remove_var("AWS_REGION"); } - #[test] - fn gcs_object_store_type() { + #[tokio::test] + async fn gcs_object_store_type() -> anyhow::Result<()> { let host_url = "gs://bucket_name/path"; let provider = ColumnQObjectStoreProvider {}; - let err = provider - .get_by_url(&Url::from_str(host_url).unwrap()) - .unwrap_err(); - assert!(err - .to_string() - .contains("Unsupported object store scheme gs")) + + let tmp_dir = Builder::new() + .prefix("columnq.test.gcs") + .tempdir()?; + let tmp_gcs_path = tmp_dir.path().join("service_account.json"); + let mut tmp_gcs = File::create(tmp_gcs_path.clone())?; + writeln!(tmp_gcs, r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"#)?; + env::set_var("GOOGLE_SERVICE_ACCOUNT", tmp_gcs_path.clone()); + + let res = provider + .get_by_url(&Url::from_str(host_url).unwrap()); + let msg = match res { + Err(e) => format!("{}", e), + Ok(_) => "".to_string(), + }; + assert_eq!("".to_string(), msg); + + drop(tmp_gcs); + tmp_dir.close()?; + env::remove_var("GOOGLE_SERVICE_ACCOUNT"); + Ok(()) } #[test] diff --git a/columnq/src/io/mod.rs b/columnq/src/io/mod.rs index 5fd5fd1..ec75134 100644 --- a/columnq/src/io/mod.rs +++ b/columnq/src/io/mod.rs @@ -11,6 +11,7 @@ use crate::error::ColumnQError; pub enum BlobStoreType { Http, S3, + GCS, FileSystem, Memory, } @@ -27,6 +28,7 @@ impl TryFrom>> for BlobStoreType { Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => Ok(BlobStoreType::Http), Some(uriparse::Scheme::Unregistered(s)) => match s.as_str() { "s3" => Ok(BlobStoreType::S3), + "gs" => Ok(BlobStoreType::GCS), "memory" => Ok(BlobStoreType::Memory), _ => Err(ColumnQError::InvalidUri(format!( "Unsupported scheme: {:?}", diff --git a/columnq/src/lib.rs b/columnq/src/lib.rs index c1ac0e4..847da17 100644 --- a/columnq/src/lib.rs +++ b/columnq/src/lib.rs @@ -18,7 +18,7 @@ macro_rules! partitions_from_table_source { io::BlobStoreType::Http => { io::http::partitions_from_uri(&$table_source, uri, $call_with_r).await } - io::BlobStoreType::S3 => { + io::BlobStoreType::S3 | io::BlobStoreType::GCS => { io::object_store::partitions_from_uri(&$table_source, uri, $call_with_r).await } io::BlobStoreType::Memory => { diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index a17d8e6..45a1f47 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -40,8 +40,8 @@ pub async fn to_delta_table( ) -> Result, ColumnQError> { match blob_type { io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)), - io::BlobStoreType::S3 => Err(ColumnQError::LoadDelta(format!( - "S3 for delta table currently only supported in conjunction with `to_memory_table` config: {}", + io::BlobStoreType::S3 | io::BlobStoreType::GCS => Err(ColumnQError::LoadDelta(format!( + "object_store for delta table currently only supported in conjunction with `to_memory_table` config: {}", delta_table.table_uri(), ))), _ => { @@ -92,7 +92,7 @@ pub async fn to_mem_table( read_partition::(r, batch_size) }, )?, - io::BlobStoreType::S3 => { + io::BlobStoreType::S3 | io::BlobStoreType::GCS => { io::object_store::partitions_from_path_iterator( path_iter, |r| -> Result, ColumnQError> { diff --git a/test_end_to_end/query_blogs.sh b/test_end_to_end/query_blogs.sh index 13fc8c9..8e5c328 100755 --- a/test_end_to_end/query_blogs.sh +++ b/test_end_to_end/query_blogs.sh @@ -1,5 +1,5 @@ #!/bin/bash -http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from blogs" 127.0.0.1:8000/api/sql) +http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs" 127.0.0.1:8000/api/sql) echo $http_status if [[ $http_status != "200" ]] @@ -10,7 +10,18 @@ else echo "success" fi -http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from blogs_dir" 127.0.0.1:8000/api/sql) +http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs_dir" 127.0.0.1:8000/api/sql) + +echo $http_status +if [[ $http_status != "200" ]] +then + echo "error" + exit 1 +else + echo "success" +fi + +http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from gcs_blogs" 127.0.0.1:8000/api/sql) echo $http_status if [[ $http_status != "200" ]] diff --git a/test_end_to_end/test_s3_direct.yml b/test_end_to_end/test_object_store_direct.yml similarity index 75% rename from test_end_to_end/test_s3_direct.yml rename to test_end_to_end/test_object_store_direct.yml index e3601e6..b6c3aae 100644 --- a/test_end_to_end/test_s3_direct.yml +++ b/test_end_to_end/test_object_store_direct.yml @@ -4,15 +4,20 @@ addr: # binding address for TCP port that speaks Postgres wire protocol postgres: 0.0.0.0:5432 tables: - - name: "blogs" + - name: "s3_blogs" uri: "s3://test-data/blogs.parquet" option: format: "parquet" # following line is different from test_end_to_end/test_s3_memory.yml use_memory_table: false - - name: "blogs_dir" + - name: "s3_blogs_dir" uri: "s3://test-data/blogs/" option: format: "parquet" # following line is different from test_end_to_end/test_s3_memory.yml use_memory_table: false + - name: "gcs_blogs" + uri: "gs://test-data/blogs.parquet" + option: + format: "parquet" + use_memory_table: false diff --git a/test_end_to_end/test_s3_memory.yml b/test_end_to_end/test_object_store_memory.yml similarity index 69% rename from test_end_to_end/test_s3_memory.yml rename to test_end_to_end/test_object_store_memory.yml index e9ff187..ae1d1f1 100644 --- a/test_end_to_end/test_s3_memory.yml +++ b/test_end_to_end/test_object_store_memory.yml @@ -4,13 +4,18 @@ addr: # binding address for TCP port that speaks Postgres wire protocol postgres: 0.0.0.0:5432 tables: - - name: "blogs" + - name: "s3_blogs" uri: "s3://test-data/blogs.parquet" option: format: "parquet" use_memory_table: true - - name: "blogs_dir" + - name: "s3_blogs_dir" uri: "s3://test-data/blogs/" option: format: "parquet" use_memory_table: true + - name: "gcs_blogs" + uri: "gs://test-data/blogs.parquet" + option: + format: "parquet" + use_memory_table: true