fix postgres extension parsing (#388)

fixes https://github.com/roapi/roapi/issues/387
This commit is contained in:
QP Hou 2025-03-23 18:22:32 -07:00 committed by GitHub
parent 479a0dcaf8
commit b2d584a74e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 119 additions and 59 deletions

View File

@ -72,9 +72,13 @@ where
{
let fs_path = uri.path().to_string();
let mut file_ext = ".".to_string();
file_ext.push_str(t.extension().context(TableExtensionSnafu {
table_io_source: t.io_source.clone(),
})?);
file_ext.push_str(
t.extension()
.context(TableExtensionSnafu {
table_io_source: t.io_source.clone(),
})?
.into(),
);
debug!("building file list from path {fs_path}...");
let files =
build_file_list(&fs_path, &file_ext).context(FileListSnafu { fs_path, file_ext })?;

View File

@ -93,6 +93,74 @@ pub enum Error {
},
}
#[derive(Debug)]
pub enum Extension {
None,
Csv,
Json,
NdJson,
Jsonl,
Parquet,
Arrow,
Arrows,
Xls,
Xlsx,
Xlsb,
Ods,
Sqlite,
Mysql,
Postgresql,
}
impl From<Extension> for &'static str {
fn from(ext: Extension) -> Self {
match ext {
Extension::None => "",
Extension::Csv => "csv",
Extension::Json => "json",
Extension::NdJson => "ndjson",
Extension::Jsonl => "jsonl",
Extension::Parquet => "parquet",
Extension::Arrow => "arrow",
Extension::Arrows => "arrows",
Extension::Xls => "xls",
Extension::Xlsx => "xlsx",
Extension::Xlsb => "xlsb",
Extension::Ods => "ods",
Extension::Sqlite => "sqlite",
Extension::Mysql => "mysql",
Extension::Postgresql => "postgresql",
}
}
}
impl TryFrom<&str> for Extension {
type Error = Error;
fn try_from(ext: &str) -> Result<Self, Error> {
Ok(match ext {
"" => Extension::None,
"csv" => Extension::Csv,
"json" => Extension::Json,
"ndjson" => Extension::NdJson,
"jsonl" => Extension::Jsonl,
"parquet" => Extension::Parquet,
"arrow" => Extension::Arrow,
"arrows" => Extension::Arrows,
"xls" => Extension::Xls,
"xlsx" => Extension::Xlsx,
"xlsb" => Extension::Xlsb,
"ods" => Extension::Ods,
"sqlite" | "sqlite3" | "db" => Extension::Sqlite,
_ => {
return Err(Error::Extension {
msg: format!("unsupported extension {ext}"),
});
}
})
}
}
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct TableColumn {
@ -372,23 +440,23 @@ impl TableLoadOption {
}
}
pub fn extension(&self) -> &'static str {
pub fn extension(&self) -> Extension {
match self {
Self::json { .. } => "json",
Self::ndjson { .. } => "ndjson",
Self::jsonl { .. } => "jsonl",
Self::csv { .. } => "csv",
Self::parquet { .. } => "parquet",
Self::google_spreadsheet(_) | Self::delta { .. } => "",
Self::xls { .. } => "xls",
Self::xlsx { .. } => "xlsx",
Self::ods { .. } => "ods",
Self::xlsb { .. } => "xlsb",
Self::arrow { .. } => "arrow",
Self::arrows { .. } => "arrows",
Self::mysql { .. } => "mysql",
Self::sqlite { .. } => "sqlite",
Self::postgres { .. } => "postgres",
Self::json { .. } => Extension::Json,
Self::ndjson { .. } => Extension::NdJson,
Self::jsonl { .. } => Extension::Jsonl,
Self::csv { .. } => Extension::Csv,
Self::parquet { .. } => Extension::Parquet,
Self::google_spreadsheet(_) | Self::delta { .. } => Extension::None,
Self::xls { .. } => Extension::Xls,
Self::xlsx { .. } => Extension::Xlsx,
Self::ods { .. } => Extension::Ods,
Self::xlsb { .. } => Extension::Xlsb,
Self::arrow { .. } => Extension::Arrow,
Self::arrows { .. } => Extension::Arrows,
Self::mysql { .. } => Extension::Mysql,
Self::sqlite { .. } => Extension::Sqlite,
Self::postgres { .. } => Extension::Postgresql,
}
}
}
@ -594,27 +662,21 @@ impl TableSource {
}
}
pub fn extension(&self) -> Result<&str, Error> {
pub fn extension(&self) -> Result<Extension, Error> {
Ok(match (&self.option, &self.io_source) {
(Some(opt), _) => opt.extension(),
(None, TableIoSource::Uri(uri)) => {
match Path::new(uri).extension().and_then(OsStr::to_str) {
Some(ext) => match ext {
"csv" | "json" | "ndjson" | "jsonl" | "parquet" | "arrow" | "arrows"
| "xls" | "xlsx" | "xlsb" | "ods" => ext,
"sqlite" | "sqlite3" | "db" => "sqlite",
_ => {
return Err(Error::Extension {
msg: format!("unsupported extension {ext} in uri: {uri}"),
});
}
},
let uri_str = Path::new(uri).extension().and_then(OsStr::to_str);
match uri_str {
Some(ext) => ext.try_into().map_err(|_| Error::Extension {
msg: format!("unsupported extension {ext} in uri: {uri}"),
})?,
None => {
// database sources doesn't have suffix extension, parse scheme instead
match TableSource::parse_option(&self.io_source) {
Some(TableLoadOption::mysql { .. }) => "mysql",
Some(TableLoadOption::sqlite { .. }) => "sqlite",
Some(TableLoadOption::postgres { .. }) => "postgres",
Some(TableLoadOption::mysql { .. }) => Extension::Mysql,
Some(TableLoadOption::sqlite { .. }) => Extension::Sqlite,
Some(TableLoadOption::postgres { .. }) => Extension::Postgresql,
_ => {
return Err(Error::Extension {
msg: format!("unsupported extension in uri: {uri}"),
@ -752,25 +814,29 @@ pub async fn load(
})
} else {
match t.extension()? {
"csv" => csv::to_loaded_table(t.clone(), dfctx.clone()).await,
"json" => json::to_loaded_table(t.clone(), dfctx.clone()).await,
"ndjson" | "jsonl" => ndjson::to_loaded_table(t.clone(), dfctx.clone()).await,
"parquet" => parquet::to_loaded_table(t.clone(), dfctx.clone()).await,
"xls" | "xlsx" | "xlsb" | "ods" => excel::to_loaded_table(t.clone()).await,
"arrow" => arrow_ipc_file::to_loaded_table(t.clone(), dfctx.clone()).await,
"arrows" => arrow_ipc_stream::to_loaded_table(t.clone(), dfctx.clone()).await,
"mysql" => Ok(LoadedTable::new_from_df_table(Arc::new(
Extension::Csv => csv::to_loaded_table(t.clone(), dfctx.clone()).await,
Extension::Json => json::to_loaded_table(t.clone(), dfctx.clone()).await,
Extension::NdJson | Extension::Jsonl => {
ndjson::to_loaded_table(t.clone(), dfctx.clone()).await
}
Extension::Parquet => parquet::to_loaded_table(t.clone(), dfctx.clone()).await,
Extension::Xls | Extension::Xlsx | Extension::Xlsb | Extension::Ods => {
excel::to_loaded_table(t.clone()).await
}
Extension::Arrow => arrow_ipc_file::to_loaded_table(t.clone(), dfctx.clone()).await,
Extension::Arrows => arrow_ipc_stream::to_loaded_table(t.clone(), dfctx.clone()).await,
Extension::Mysql => Ok(LoadedTable::new_from_df_table(Arc::new(
database::DatabaseLoader::MySQL.to_mem_table(t)?,
))),
"sqlite" => Ok(LoadedTable::new_from_df_table(Arc::new(
Extension::Sqlite => Ok(LoadedTable::new_from_df_table(Arc::new(
database::DatabaseLoader::SQLite.to_mem_table(t)?,
))),
"postgresql" => Ok(LoadedTable::new_from_df_table(Arc::new(
Extension::Postgresql => Ok(LoadedTable::new_from_df_table(Arc::new(
database::DatabaseLoader::Postgres.to_mem_table(t)?,
))),
ext => Err(Error::InvalidUri {
msg: format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
"failed to register `{}` as table `{}`, unsupported table format `{:?}`",
t.io_source, t.name, ext,
),
}),

View File

@ -1,7 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use axum::extract::{Extension, Json};
use columnq::error::ColumnQError;
use columnq::table::TableSource;
use log::info;
use serde::Deserialize;
@ -25,10 +24,7 @@ pub async fn drop_table<H: RoapiContext>(
for config in body {
if let Some(t) = tables.get(&config.table_name) {
info!("dropping table `{}`", t.name);
ctx.drop_table(t)
.await
.map_err(ColumnQError::from)
.map_err(ApiErrResp::drop_table)?;
ctx.drop_table(t).await.map_err(ApiErrResp::drop_table)?;
tables.remove(&config.table_name);
info!("dropped table `{}`", config.table_name);
} else {

View File

@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc};
use axum::extract::{Extension, Json};
use columnq::{error::ColumnQError, table::TableSource};
use columnq::table::TableSource;
use log::info;
use serde::Deserialize;
use tokio::sync::Mutex;
@ -26,10 +26,7 @@ pub async fn register_table<H: RoapiContext>(
if let Some(ref uri) = config.uri {
let t = TableSource::new_with_uri(&config.table_name, uri);
info!("loading `{}` as table `{}`", t.io_source, config.table_name);
ctx.load_table(&t)
.await
.map_err(ColumnQError::from)
.map_err(ApiErrResp::load_table)?;
ctx.load_table(&t).await.map_err(ApiErrResp::load_table)?;
tables.insert(config.table_name.clone(), t.clone());
info!(
"registered `{}` as table `{}`",
@ -37,10 +34,7 @@ pub async fn register_table<H: RoapiContext>(
);
} else if let Some(t) = tables.get(&config.table_name) {
info!("Re register table {}", t.name);
ctx.load_table(t)
.await
.map_err(ColumnQError::from)
.map_err(ApiErrResp::load_table)?;
ctx.load_table(t).await.map_err(ApiErrResp::load_table)?;
} else {
return Err(ApiErrResp::register_table(format!(
"Table `{}` source not exists",