mirror of
https://github.com/roapi/roapi.git
synced 2026-06-05 21:04:02 +08:00
replace s3 table source to unified io:object_store (#244)
This commit is contained in:
parent
1e3644ec36
commit
9db6b418cb
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@ -150,6 +150,7 @@ jobs:
|
||||
$HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin
|
||||
$HOME/minio-binaries/mc mb local/test-data
|
||||
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data
|
||||
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/
|
||||
- name: Start roapi and Query
|
||||
env:
|
||||
AWS_DEFAULT_REGION: "us-east-1"
|
||||
@ -203,6 +204,7 @@ jobs:
|
||||
$HOME/minio-binaries/mc alias set local http://127.0.0.1:9000 minioadmin minioadmin
|
||||
$HOME/minio-binaries/mc mb local/test-data
|
||||
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data
|
||||
$HOME/minio-binaries/mc cp test_data/blogs.parquet local/test-data/blogs/
|
||||
- name: Start roapi and Query
|
||||
env:
|
||||
AWS_DEFAULT_REGION: "us-east-1"
|
||||
|
||||
@ -42,6 +42,12 @@ pub enum ColumnQError {
|
||||
#[error("Error loading data from S3 store: {0}")]
|
||||
S3Store(String),
|
||||
|
||||
#[error("DataFusion error: {source}")]
|
||||
ObjectStore {
|
||||
#[from]
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[error("DeltaTable error: {source}")]
|
||||
DeltaTable {
|
||||
#[from]
|
||||
|
||||
@ -4,6 +4,7 @@ pub mod fs;
|
||||
pub mod http;
|
||||
pub mod memory;
|
||||
pub mod s3;
|
||||
pub mod object_store;
|
||||
|
||||
use crate::error::ColumnQError;
|
||||
|
||||
|
||||
62
columnq/src/io/object_store.rs
Normal file
62
columnq/src/io/object_store.rs
Normal file
@ -0,0 +1,62 @@
|
||||
use futures::TryStreamExt;
|
||||
use std::str::FromStr;
|
||||
use url::Url;
|
||||
use crate::table::TableSource;
|
||||
use uriparse::URIReference;
|
||||
use datafusion::datasource::object_store::ObjectStoreProvider;
|
||||
use std::sync::Arc;
|
||||
use crate::error::ColumnQError;
|
||||
use object_store::ObjectStore;
|
||||
use crate::columnq::ColumnQObjectStoreProvider;
|
||||
|
||||
pub async fn partition_key_to_reader(
|
||||
client: Arc<dyn ObjectStore>,
|
||||
path: &object_store::path::Path,
|
||||
) -> Result<std::io::Cursor<Vec<u8>>, ColumnQError> {
|
||||
let get_result = client
|
||||
.get(path)
|
||||
.await?;
|
||||
let bytes = get_result
|
||||
.bytes()
|
||||
.await?;
|
||||
Ok(std::io::Cursor::new(bytes.to_vec()))
|
||||
}
|
||||
|
||||
pub async fn partitions_from_uri<'a, F, T>(
|
||||
t: &'a TableSource,
|
||||
_uri: URIReference<'a>,
|
||||
mut partition_reader: F,
|
||||
) -> Result<Vec<T>, ColumnQError>
|
||||
where
|
||||
F: FnMut(std::io::Cursor<Vec<u8>>) -> Result<T, ColumnQError>,
|
||||
{
|
||||
let object_store_provider = ColumnQObjectStoreProvider {};
|
||||
let url = &Url::from_str(t.get_uri_str()).unwrap();
|
||||
let client = object_store_provider.get_by_url(url)?;
|
||||
let mut partitions = vec![];
|
||||
|
||||
// first try loading table uri as single object
|
||||
// url.path starts with "/", but object_store does not expect "/" at the beginning
|
||||
let path = object_store::path::Path::from(&url.path()[1..]);
|
||||
match partition_key_to_reader(client.clone(), &path).await {
|
||||
Ok(reader) => {
|
||||
partitions.push(partition_reader(reader)?);
|
||||
}
|
||||
Err(_) => {
|
||||
// fallback to directory listing
|
||||
let paths = client.clone()
|
||||
.list(Some(&path))
|
||||
.await?
|
||||
.map_ok(|meta| meta.location)
|
||||
.try_collect::<Vec<object_store::path::Path>>()
|
||||
.await
|
||||
.unwrap();
|
||||
for f in paths {
|
||||
let reader = partition_key_to_reader(client.clone(), &f).await?;
|
||||
partitions.push(partition_reader(reader)?);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(partitions)
|
||||
}
|
||||
@ -19,7 +19,7 @@ macro_rules! partitions_from_table_source {
|
||||
io::http::partitions_from_uri(&$table_source, uri, $call_with_r).await
|
||||
}
|
||||
io::BlobStoreType::S3 => {
|
||||
io::s3::partitions_from_uri(&$table_source, uri, $call_with_r).await
|
||||
io::object_store::partitions_from_uri(&$table_source, uri, $call_with_r).await
|
||||
}
|
||||
io::BlobStoreType::Memory => {
|
||||
io::memory::partitions_from_memory(&$table_source, $call_with_r).await
|
||||
|
||||
@ -8,5 +8,17 @@ then
|
||||
exit 1
|
||||
else
|
||||
echo "success"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
http_status=$(curl -o /dev/null -s -w "%{http_code}" -X POST -d "SELECT count(1) from blogs_dir" 127.0.0.1:8000/api/sql)
|
||||
|
||||
echo $http_status
|
||||
if [[ $http_status != "200" ]]
|
||||
then
|
||||
echo "error"
|
||||
exit 1
|
||||
else
|
||||
echo "success"
|
||||
fi
|
||||
|
||||
exit 0
|
||||
|
||||
@ -10,3 +10,9 @@ tables:
|
||||
format: "parquet"
|
||||
# following line is different from test_end_to_end/test_s3_memory.yml
|
||||
use_memory_table: false
|
||||
- name: "blogs_dir"
|
||||
uri: "s3://test-data/blogs/"
|
||||
option:
|
||||
format: "parquet"
|
||||
# following line is different from test_end_to_end/test_s3_memory.yml
|
||||
use_memory_table: false
|
||||
|
||||
@ -9,3 +9,8 @@ tables:
|
||||
option:
|
||||
format: "parquet"
|
||||
use_memory_table: true
|
||||
- name: "blogs_dir"
|
||||
uri: "s3://test-data/blogs/"
|
||||
option:
|
||||
format: "parquet"
|
||||
use_memory_table: true
|
||||
|
||||
Loading…
Reference in New Issue
Block a user