support http for delta tables (#333)

This commit is contained in:
QP Hou 2024-05-26 00:33:01 -07:00 committed by GitHub
parent a46351c4d4
commit 9eba0281c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 78 additions and 5 deletions

View File

@ -142,7 +142,7 @@ jobs:
command: /home/runner/work/roapi/roapi/test_end_to_end/query_blogs.sh
build: mold -run cargo build
start: mold -run cargo run --bin roapi -- -c test_end_to_end/test_object_store_memory.yml
wait-on: "http://127.0.0.1:8000/api/schema"
wait-on: "http://127.0.0.1:8000/health"
# By default, wait-on will retry for 60 seconds. You can pass a custom timeout in seconds using wait-on-timeout.
# 10 minutes = 600 seconds
wait-on-timeout: 600

View File

@ -60,9 +60,13 @@ pub async fn to_datafusion_table(
let TableOptionDelta { use_memory_table } = opt.as_delta()?;
let uri_str = t.get_uri_str();
let delta_table = deltalake::open_table(uri_str)
.await
let delta_table = deltalake::DeltaTableBuilder::from_valid_uri(uri_str)
.context(OpenTableSnafu)
.context(table::LoadDeltaSnafu)?
.with_allow_http(true)
.load()
.await
.context(LoadTableSnafu)
.context(table::LoadDeltaSnafu)?;
let parsed_uri = t.parsed_uri()?;
let url_scheme = parsed_uri.scheme();

View File

@ -3,8 +3,7 @@ mod helpers;
use columnq::datafusion::arrow::datatypes::DataType;
use columnq::table::{TableColumn, TableLoadOption, TableOptionCsv, TableSource};
#[tokio::test]
async fn test_partitioned_csv_table() {
fn partitioned_csv_table() -> TableSource {
let table_path = helpers::test_data_path("partitioned_csv");
let table = TableSource::new("partitioned_csv".to_string(), table_path)
.with_option(TableLoadOption::csv(
@ -23,6 +22,13 @@ async fn test_partitioned_csv_table() {
},
]);
table
}
#[tokio::test]
async fn test_partitioned_csv_table() {
let table = partitioned_csv_table();
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
@ -47,3 +53,28 @@ async fn test_partitioned_csv_table() {
);
assert_eq!(status, 200);
}
#[tokio::test]
async fn test_partitioned_csv_table_filter_by_partition() {
let table = partitioned_csv_table();
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(
&format!("{address}/api/sql"),
"SELECT * FROM partitioned_csv WHERE year = '2022' AND month = 12 AND value > 1",
)
.await;
let status = response.status();
let data = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(
data,
serde_json::json!([
{"year": 2022, "month": 12, "ts": 101, "value": 7.8},
{"year": 2022, "month": 12, "ts": 102, "value": 4.0},
])
);
assert_eq!(status, 200);
}

View File

@ -15,17 +15,25 @@ function check_status() {
SQL_ENDPOINT="127.0.0.1:8000/api/sql"
echo "Test s3 blogs..."
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs" "${SQL_ENDPOINT}")
check_status ${http_status}
echo "Test s3 blogs space encoded..."
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs_space_encode" "${SQL_ENDPOINT}")
check_status ${http_status}
echo "Test s3 blogs in a directory..."
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from s3_blogs_dir" "${SQL_ENDPOINT}")
check_status ${http_status}
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(*) from s3_delta WHERE reply_id is null" "${SQL_ENDPOINT}")
check_status ${http_status}
echo "Test gcs blogs..."
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from gcs_blogs" "${SQL_ENDPOINT}")
check_status ${http_status}
echo "Test azure blogs..."
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from azure_blogs" "${SQL_ENDPOINT}")
check_status ${http_status}

View File

@ -21,6 +21,21 @@ tables:
format: "parquet"
# following line is different from test_end_to_end/test_s3_memory.yml
use_memory_table: false
- name: "s3_partitioned"
uri: "s3://test-data/partitioned_blogs/"
partition_columns:
- name: year
data_type: UInt16
- name: month
data_type: UInt8
option:
format: "parquet"
use_memory_table: false
- name: "s3_delta"
uri: "s3://test-data/blogs-delta"
option:
format: "delta"
use_memory_table: false
- name: "gcs_blogs"
uri: "gs://test-data/blogs.parquet"
option:

View File

@ -19,6 +19,21 @@ tables:
option:
format: "parquet"
use_memory_table: true
- name: "s3_partitioned"
uri: "s3://test-data/partitioned_blogs/"
partition_columns:
- name: year
data_type: UInt16
- name: month
data_type: UInt8
option:
format: "parquet"
use_memory_table: true
- name: "s3_delta"
uri: "s3://test-data/blogs-delta"
option:
format: "delta"
use_memory_table: true
- name: "gcs_blogs"
uri: "gs://test-data/blogs.parquet"
option: