Native GCS support (#248)

* Set object_store minimal version and turn on aws_profile feature

* Add native GCS support with GOOGLE_APPLICATION_CREDENTIALS

* hacky: create /tmp/gcs.json for test

* use GoogleCloudStorageBuilder::from_env

* create temp file inside test for GCS

Co-authored-by: QP Hou <dave2008713@gmail.com>
This commit is contained in:
Rich 2023-01-22 02:42:47 -05:00 committed by GitHub
parent e4f00489d1
commit fffc8558a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 105 additions and 32 deletions

View File

@ -40,7 +40,8 @@ jobs:
- name: Build
run: cargo build
- name: Run tests
run: cargo test
run: |
cargo test
- name: Trim cache
run: |
which cargo-cache || cargo install cargo-cache
@ -72,7 +73,8 @@ jobs:
toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }}
override: true
- name: Run tests
run: cargo test --features simd
run: |
cargo test --features simd
- name: Trim cache
run: |
which cargo-cache || cargo install cargo-cache
@ -107,13 +109,14 @@ jobs:
- name: Build
run: cargo build --features database
- name: Run tests
run: cargo test --features database
run: |
cargo test --features database
- name: Trim cache
run: |
which cargo-cache || cargo install cargo-cache
cargo cache trim -l 1G
s3_memory_test:
object_store_memory_test:
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C target-cpu=skylake"
@ -138,36 +141,45 @@ jobs:
# toolchain: nightly
toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }}
override: true
- name: Install minio server
- name: Install minio server (S3)
run: |
docker run -d -p 9000:9000 quay.io/minio/minio server /data
- name: Install minio client
- name: Install minio client (S3)
run: |
curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o $HOME/minio-binaries/mc
chmod +x $HOME/minio-binaries/mc
- name: Create bucket and copy test data
- name: Create bucket and copy test data (S3)
run: |
$HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin
$HOME/minio-binaries/mc mb local/test-data
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/
- name: Install fake gcs server (GCS)
run: |
docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "/tmp/gcs.json"
- name: Create bucket and copy test data (GCS)
run: |
curl -XPOST --data-binary '{"name":"test-data"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
curl -XPOST --data-binary '@test_data/blogs.parquet' "http://localhost:4443/storage/v1/b/test-data/o?uploadType=media&name=blogs.parquet"
- name: Start roapi and Query
env:
AWS_DEFAULT_REGION: "us-east-1"
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin
AWS_ENDPOINT_URL: http://127.0.0.1:9000
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
uses: BerniWittmann/background-server-action@v1.0.4
with:
command: /home/runner/work/roapi/roapi/test_end_to_end/query_blogs.sh
build: cargo build
start: cargo run --bin roapi -- -c test_end_to_end/test_s3_memory.yml
start: cargo run --bin roapi -- -c test_end_to_end/test_object_store_memory.yml
wait-on: "http://127.0.0.1:8000/api/schema"
# By default, wait-on will retry for 60 seconds. You can pass a custom timeout in seconds using wait-on-timeout.
# 10 minutes = 600 seconds
wait-on-timeout: 600
s3_direct_test:
object_store_direct_test:
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C target-cpu=skylake"
@ -192,30 +204,39 @@ jobs:
# toolchain: nightly
toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }}
override: true
- name: Install minio server
- name: Install minio server (S3)
run: |
docker run -d -p 9000:9000 quay.io/minio/minio server /data
- name: Install minio client
- name: Install minio client (S3)
run: |
curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o $HOME/minio-binaries/mc
chmod +x $HOME/minio-binaries/mc
- name: Create bucket and copy test data
- name: Create bucket and copy test data (S3)
run: |
$HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin
$HOME/minio-binaries/mc mb local/test-data
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/
- name: Install fake gcs server (GCS)
run: |
docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "/tmp/gcs.json"
- name: Create bucket and copy test data (GCS)
run: |
curl -XPOST --data-binary '{"name":"test-data"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
curl -XPOST --data-binary '@test_data/blogs.parquet' "http://localhost:4443/storage/v1/b/test-data/o?uploadType=media&name=blogs.parquet"
- name: Start roapi and Query
env:
AWS_DEFAULT_REGION: "us-east-1"
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin
AWS_ENDPOINT_URL: http://127.0.0.1:9000
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
uses: BerniWittmann/background-server-action@v1.0.4
with:
command: /home/runner/work/roapi/roapi/test_end_to_end/query_blogs.sh
build: cargo build
start: cargo run --bin roapi -- -c test_end_to_end/test_s3_direct.yml
start: cargo run --bin roapi -- -c test_end_to_end/test_object_store_direct.yml
wait-on: "http://127.0.0.1:8000/api/schema"
# By default, wait-on will retry for 60 seconds. You can pass a custom timeout in seconds using wait-on-timeout.
# 10 minutes = 600 seconds

1
Cargo.lock generated
View File

@ -2811,6 +2811,7 @@ dependencies = [
"rand",
"reqwest",
"ring",
"rustls-pemfile 1.0.1",
"serde",
"serde_json",
"snafu",

View File

@ -16,7 +16,7 @@ path = "src/lib.rs"
arrow-schema = { version ="26", features = ["serde"] }
datafusion = "14"
object_store = { version = "0.5.3", features = ["aws_profile"] }
object_store = { version = "0.5.3", features = ["aws_profile", "gcp"] }
url = "2.2"
log = "0"

View File

@ -13,6 +13,7 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::collect;
use object_store::aws::AmazonS3Builder;
use object_store::gcp::GoogleCloudStorageBuilder;
use crate::error::{ColumnQError, QueryError};
use crate::query;
use crate::table::{self, KeyValueSource, TableSource};
@ -34,6 +35,15 @@ impl ObjectStoreProvider for ColumnQObjectStoreProvider {
Err(err) => Err(DataFusionError::External(Box::new(err))),
}
},
"gs" => {
let host = url.host_str().unwrap();
let gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(host);
match gcs_builder.build() {
Ok(gcs) => Ok(Arc::new(gcs)),
Err(err) => Err(DataFusionError::External(Box::new(err))),
}
},
_ => Err(DataFusionError::Execution(format!(
"Unsupported object store scheme {}",
url_schema
@ -178,7 +188,10 @@ impl Default for ColumnQ {
#[cfg(test)]
mod tests {
use tempfile::Builder;
use std::io::Write;
use std::{env, str::FromStr};
use std::fs::File;
use datafusion::datasource::object_store::ObjectStoreProvider;
use url::Url;
@ -206,16 +219,31 @@ mod tests {
env::remove_var("AWS_REGION");
}
#[test]
fn gcs_object_store_type() {
#[tokio::test]
async fn gcs_object_store_type() -> anyhow::Result<()> {
let host_url = "gs://bucket_name/path";
let provider = ColumnQObjectStoreProvider {};
let err = provider
.get_by_url(&Url::from_str(host_url).unwrap())
.unwrap_err();
assert!(err
.to_string()
.contains("Unsupported object store scheme gs"))
let tmp_dir = Builder::new()
.prefix("columnq.test.gcs")
.tempdir()?;
let tmp_gcs_path = tmp_dir.path().join("service_account.json");
let mut tmp_gcs = File::create(tmp_gcs_path.clone())?;
writeln!(tmp_gcs, r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"#)?;
env::set_var("GOOGLE_SERVICE_ACCOUNT", tmp_gcs_path.clone());
let res = provider
.get_by_url(&Url::from_str(host_url).unwrap());
let msg = match res {
Err(e) => format!("{}", e),
Ok(_) => "".to_string(),
};
assert_eq!("".to_string(), msg);
drop(tmp_gcs);
tmp_dir.close()?;
env::remove_var("GOOGLE_SERVICE_ACCOUNT");
Ok(())
}
#[test]

View File

@ -11,6 +11,7 @@ use crate::error::ColumnQError;
pub enum BlobStoreType {
Http,
S3,
GCS,
FileSystem,
Memory,
}
@ -27,6 +28,7 @@ impl TryFrom<Option<&uriparse::Scheme<'_>>> for BlobStoreType {
Some(uriparse::Scheme::HTTP) | Some(uriparse::Scheme::HTTPS) => Ok(BlobStoreType::Http),
Some(uriparse::Scheme::Unregistered(s)) => match s.as_str() {
"s3" => Ok(BlobStoreType::S3),
"gs" => Ok(BlobStoreType::GCS),
"memory" => Ok(BlobStoreType::Memory),
_ => Err(ColumnQError::InvalidUri(format!(
"Unsupported scheme: {:?}",

View File

@ -18,7 +18,7 @@ macro_rules! partitions_from_table_source {
io::BlobStoreType::Http => {
io::http::partitions_from_uri(&$table_source, uri, $call_with_r).await
}
io::BlobStoreType::S3 => {
io::BlobStoreType::S3 | io::BlobStoreType::GCS => {
io::object_store::partitions_from_uri(&$table_source, uri, $call_with_r).await
}
io::BlobStoreType::Memory => {

View File

@ -40,8 +40,8 @@ pub async fn to_delta_table(
) -> Result<Arc<dyn TableProvider>, ColumnQError> {
match blob_type {
io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)),
io::BlobStoreType::S3 => Err(ColumnQError::LoadDelta(format!(
"S3 for delta table currently only supported in conjunction with `to_memory_table` config: {}",
io::BlobStoreType::S3 | io::BlobStoreType::GCS => Err(ColumnQError::LoadDelta(format!(
"object_store for delta table currently only supported in conjunction with `to_memory_table` config: {}",
delta_table.table_uri(),
))),
_ => {
@ -92,7 +92,7 @@ pub async fn to_mem_table(
read_partition::<std::fs::File>(r, batch_size)
},
)?,
io::BlobStoreType::S3 => {
io::BlobStoreType::S3 | io::BlobStoreType::GCS => {
io::object_store::partitions_from_path_iterator(
path_iter,
|r| -> Result<Vec<RecordBatch>, ColumnQError> {

View File

@ -1,5 +1,5 @@
#!/bin/bash
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from blogs" 127.0.0.1:8000/api/sql)
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs" 127.0.0.1:8000/api/sql)
echo $http_status
if [[ $http_status != "200" ]]
@ -10,7 +10,18 @@ else
echo "success"
fi
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from blogs_dir" 127.0.0.1:8000/api/sql)
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs_dir" 127.0.0.1:8000/api/sql)
echo $http_status
if [[ $http_status != "200" ]]
then
echo "error"
exit 1
else
echo "success"
fi
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from gcs_blogs" 127.0.0.1:8000/api/sql)
echo $http_status
if [[ $http_status != "200" ]]

View File

@ -4,15 +4,20 @@ addr:
# binding address for TCP port that speaks Postgres wire protocol
postgres: 0.0.0.0:5432
tables:
- name: "blogs"
- name: "s3_blogs"
uri: "s3://test-data/blogs.parquet"
option:
format: "parquet"
# following line is different from test_end_to_end/test_s3_memory.yml
use_memory_table: false
- name: "blogs_dir"
- name: "s3_blogs_dir"
uri: "s3://test-data/blogs/"
option:
format: "parquet"
# following line is different from test_end_to_end/test_s3_memory.yml
use_memory_table: false
- name: "gcs_blogs"
uri: "gs://test-data/blogs.parquet"
option:
format: "parquet"
use_memory_table: false

View File

@ -4,13 +4,18 @@ addr:
# binding address for TCP port that speaks Postgres wire protocol
postgres: 0.0.0.0:5432
tables:
- name: "blogs"
- name: "s3_blogs"
uri: "s3://test-data/blogs.parquet"
option:
format: "parquet"
use_memory_table: true
- name: "blogs_dir"
- name: "s3_blogs_dir"
uri: "s3://test-data/blogs/"
option:
format: "parquet"
use_memory_table: true
- name: "gcs_blogs"
uri: "gs://test-data/blogs.parquet"
option:
format: "parquet"
use_memory_table: true