feat: support specifying table name in database table load cfg (#371)
Some checks failed
build / build (push) Has been cancelled
build / database_test (push) Has been cancelled
build / object_store_memory_test (push) Has been cancelled
build / object_store_direct_test (push) Has been cancelled
build / openssl_build (push) Has been cancelled
build / mac_cross_build (push) Has been cancelled
build / Docker Image Build (push) Has been cancelled
columnq-cli release / Validate git tag (push) Has been cancelled
roapi release / Validate git tag (push) Has been cancelled
columnq-cli release / macos (push) Has been cancelled
columnq-cli release / windows (map[features:database-sqlite python-architecture:x64 target:x86_64-pc-windows-msvc]) (push) Has been cancelled
columnq-cli release / linux (map[features:rustls,database-sqlite image_tag:aarch64-musl manylinux:2014 name_suffix: rustflags: target:aarch64-unknown-linux-musl upload:true]) (push) Has been cancelled
columnq-cli release / linux (map[features:rustls,database-sqlite image_tag:x86_64-musl manylinux:2010 name_suffix: rustflags:-C target-cpu=skylake target:x86_64-unknown-linux-musl upload:true]) (push) Has been cancelled
columnq-cli release / PyPI Release (push) Has been cancelled
roapi release / macos (push) Has been cancelled
roapi release / windows (map[features:database-sqlite python-architecture:x64 target:x86_64-pc-windows-msvc]) (push) Has been cancelled
roapi release / linux (map[features:rustls,database-sqlite image_tag:aarch64-musl manylinux:2014 name_suffix: rustflags: target:aarch64-unknown-linux-musl upload:true]) (push) Has been cancelled
roapi release / linux (map[features:rustls,database-sqlite image_tag:x86_64-musl manylinux:2010 name_suffix: rustflags:-C target-cpu=skylake target:x86_64-unknown-linux-musl upload:true]) (push) Has been cancelled
roapi release / PyPI Release (push) Has been cancelled
roapi release / Docker Image Release (push) Has been cancelled

This commit is contained in:
QP Hou 2025-01-04 01:57:34 -08:00 committed by GitHub
parent 809521124f
commit 3d6ffa796c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 10 deletions

View File

@ -10,6 +10,7 @@ pub enum DatabaseLoader {
feature = "database-postgres"
))]
mod imp {
use crate::table::TableLoadOption;
use crate::table::{self, TableSource};
use connectorx::prelude::*;
use log::debug;
@ -39,7 +40,16 @@ mod imp {
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, table::Error> {
debug!("loading database table data...");
let queries = CXQuery::naked(format!("SELECT * FROM {}", t.name));
let table_name = match &t.option {
Some(TableLoadOption::mysql { table }) => table.clone(),
Some(TableLoadOption::postgres { table }) => table.clone(),
Some(TableLoadOption::sqlite { table }) => table.clone(),
_ => None,
}
.unwrap_or(t.name.clone());
let queries = CXQuery::naked(format!("SELECT * FROM {}", table_name));
let source = SourceConn::try_from(t.get_uri_str())
.context(SourceSnafu)
.context(table::LoadDatabaseSnafu)?;

View File

@ -323,9 +323,15 @@ pub enum TableLoadOption {
delta(TableOptionDelta),
arrow {},
arrows {},
mysql {},
sqlite {},
postgres {},
mysql {
table: Option<String>,
},
sqlite {
table: Option<String>,
},
postgres {
table: Option<String>,
},
}
impl TableLoadOption {
@ -420,6 +426,10 @@ impl std::fmt::Display for TableIoSource {
}
}
fn table_name_from_path(path: &uriparse::Path) -> Option<String> {
Some(path.segments()[0].to_string())
}
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct TableSource {
@ -542,9 +552,16 @@ impl TableSource {
let uri = URIReference::try_from(uri.as_str()).ok()?;
let scheme = uri.scheme()?;
match scheme.as_str() {
"mysql" => Some(TableLoadOption::mysql {}),
"sqlite" => Some(TableLoadOption::sqlite {}),
"postgresql" => Some(TableLoadOption::postgres {}),
"mysql" => Some(TableLoadOption::mysql {
table: table_name_from_path(uri.path()),
}),
"sqlite" => Some(TableLoadOption::sqlite {
// for sqlite, db uri only contains the path to the db file
table: None,
}),
"postgresql" => Some(TableLoadOption::postgres {
table: table_name_from_path(uri.path()),
}),
_ => None,
}
}
@ -595,9 +612,9 @@ impl TableSource {
None => {
// database sources doesn't have suffix extension, parse scheme instead
match TableSource::parse_option(&self.io_source) {
Some(TableLoadOption::mysql {}) => "mysql",
Some(TableLoadOption::sqlite {}) => "sqlite",
Some(TableLoadOption::postgres {}) => "postgres",
Some(TableLoadOption::mysql { .. }) => "mysql",
Some(TableLoadOption::sqlite { .. }) => "sqlite",
Some(TableLoadOption::postgres { .. }) => "postgres",
_ => {
return Err(Error::Extension {
msg: format!("unsupported extension in uri: {uri}"),
@ -961,6 +978,18 @@ schema:
);
}
#[test]
fn test_table_name_from_path() {
assert_eq!(
table_name_from_path(
&URIReference::try_from("mysql://root:123456@1.1.1.1:3306/test")
.unwrap()
.path()
),
Some("test".to_string()),
);
}
#[cfg(feature = "database-sqlite")]
#[tokio::test]
async fn test_load_sqlite_table() {