From 9eba0281c66a5e94e4dcf0a4785b3ee1ca522265 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Sun, 26 May 2024 00:33:01 -0700 Subject: [PATCH] support http for delta tables (#333) --- .github/workflows/build.yml | 2 +- columnq/src/table/delta.rs | 8 +++-- roapi/tests/partitioned_table_test.rs | 35 ++++++++++++++++++-- test_end_to_end/query_blogs.sh | 8 +++++ test_end_to_end/test_object_store_direct.yml | 15 +++++++++ test_end_to_end/test_object_store_memory.yml | 15 +++++++++ 6 files changed, 78 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1983920..193f694 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -142,7 +142,7 @@ jobs: command: /home/runner/work/roapi/roapi/test_end_to_end/query_blogs.sh build: mold -run cargo build start: mold -run cargo run --bin roapi -- -c test_end_to_end/test_object_store_memory.yml - wait-on: "http://127.0.0.1:8000/api/schema" + wait-on: "http://127.0.0.1:8000/health" # By default, wait-on will retry for 60 seconds. You can pass a custom timeout in seconds using wait-on-timeout. # 10 minutes = 600 seconds wait-on-timeout: 600 diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index 7147534..0fa6e1f 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -60,9 +60,13 @@ pub async fn to_datafusion_table( let TableOptionDelta { use_memory_table } = opt.as_delta()?; let uri_str = t.get_uri_str(); - let delta_table = deltalake::open_table(uri_str) - .await + let delta_table = deltalake::DeltaTableBuilder::from_valid_uri(uri_str) .context(OpenTableSnafu) + .context(table::LoadDeltaSnafu)? + .with_allow_http(true) + .load() + .await + .context(LoadTableSnafu) .context(table::LoadDeltaSnafu)?; let parsed_uri = t.parsed_uri()?; let url_scheme = parsed_uri.scheme(); diff --git a/roapi/tests/partitioned_table_test.rs b/roapi/tests/partitioned_table_test.rs index 42c3347..728f02f 100644 --- a/roapi/tests/partitioned_table_test.rs +++ b/roapi/tests/partitioned_table_test.rs @@ -3,8 +3,7 @@ mod helpers; use columnq::datafusion::arrow::datatypes::DataType; use columnq::table::{TableColumn, TableLoadOption, TableOptionCsv, TableSource}; -#[tokio::test] -async fn test_partitioned_csv_table() { +fn partitioned_csv_table() -> TableSource { let table_path = helpers::test_data_path("partitioned_csv"); let table = TableSource::new("partitioned_csv".to_string(), table_path) .with_option(TableLoadOption::csv( @@ -23,6 +22,13 @@ async fn test_partitioned_csv_table() { }, ]); + table +} + +#[tokio::test] +async fn test_partitioned_csv_table() { + let table = partitioned_csv_table(); + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); @@ -47,3 +53,28 @@ async fn test_partitioned_csv_table() { ); assert_eq!(status, 200); } + +#[tokio::test] +async fn test_partitioned_csv_table_filter_by_partition() { + let table = partitioned_csv_table(); + + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; + tokio::spawn(app.run_until_stopped()); + + let response = helpers::http_post( + &format!("{address}/api/sql"), + "SELECT * FROM partitioned_csv WHERE year = '2022' AND month = 12 AND value > 1", + ) + .await; + + let status = response.status(); + let data = response.json::().await.unwrap(); + assert_eq!( + data, + serde_json::json!([ + {"year": 2022, "month": 12, "ts": 101, "value": 7.8}, + {"year": 2022, "month": 12, "ts": 102, "value": 4.0}, + ]) + ); + assert_eq!(status, 200); +} diff --git a/test_end_to_end/query_blogs.sh b/test_end_to_end/query_blogs.sh index ddbf8d1..1310588 100755 --- a/test_end_to_end/query_blogs.sh +++ b/test_end_to_end/query_blogs.sh @@ -15,17 +15,25 @@ function check_status() { SQL_ENDPOINT="127.0.0.1:8000/api/sql" +echo "Test s3 blogs..." http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs" "${SQL_ENDPOINT}") check_status ${http_status} +echo "Test s3 blogs space encoded..." http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs_space_encode" "${SQL_ENDPOINT}") check_status ${http_status} +echo "Test s3 blogs in a directory..." http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs_dir" "${SQL_ENDPOINT}") check_status ${http_status} +http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(*) from s3_delta WHERE reply_id is null" "${SQL_ENDPOINT}") +check_status ${http_status} + +echo "Test gcs blogs..." http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from gcs_blogs" "${SQL_ENDPOINT}") check_status ${http_status} +echo "Test azure blogs..." http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from azure_blogs" "${SQL_ENDPOINT}") check_status ${http_status} diff --git a/test_end_to_end/test_object_store_direct.yml b/test_end_to_end/test_object_store_direct.yml index 6c3856d..8f62faf 100644 --- a/test_end_to_end/test_object_store_direct.yml +++ b/test_end_to_end/test_object_store_direct.yml @@ -21,6 +21,21 @@ tables: format: "parquet" # following line is different from test_end_to_end/test_s3_memory.yml use_memory_table: false + - name: "s3_partitioned" + uri: "s3://test-data/partitioned_blogs/" + partition_columns: + - name: year + data_type: UInt16 + - name: month + data_type: UInt8 + option: + format: "parquet" + use_memory_table: false + - name: "s3_delta" + uri: "s3://test-data/blogs-delta" + option: + format: "delta" + use_memory_table: false - name: "gcs_blogs" uri: "gs://test-data/blogs.parquet" option: diff --git a/test_end_to_end/test_object_store_memory.yml b/test_end_to_end/test_object_store_memory.yml index 6a03858..a56a0ff 100644 --- a/test_end_to_end/test_object_store_memory.yml +++ b/test_end_to_end/test_object_store_memory.yml @@ -19,6 +19,21 @@ tables: option: format: "parquet" use_memory_table: true + - name: "s3_partitioned" + uri: "s3://test-data/partitioned_blogs/" + partition_columns: + - name: year + data_type: UInt16 + - name: month + data_type: UInt8 + option: + format: "parquet" + use_memory_table: true + - name: "s3_delta" + uri: "s3://test-data/blogs-delta" + option: + format: "delta" + use_memory_table: true - name: "gcs_blogs" uri: "gs://test-data/blogs.parquet" option: