From b2d584a74eac74ba7a72f3a88c27368760826c10 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Sun, 23 Mar 2025 18:22:32 -0700 Subject: [PATCH] fix postgres extension parsing (#388) fixes https://github.com/roapi/roapi/issues/387 --- columnq/src/io/fs.rs | 10 ++- columnq/src/table/mod.rs | 150 +++++++++++++++++++++++++++----------- roapi/src/api/drop.rs | 6 +- roapi/src/api/register.rs | 12 +-- 4 files changed, 119 insertions(+), 59 deletions(-) diff --git a/columnq/src/io/fs.rs b/columnq/src/io/fs.rs index ef9190d..2a186d8 100644 --- a/columnq/src/io/fs.rs +++ b/columnq/src/io/fs.rs @@ -72,9 +72,13 @@ where { let fs_path = uri.path().to_string(); let mut file_ext = ".".to_string(); - file_ext.push_str(t.extension().context(TableExtensionSnafu { - table_io_source: t.io_source.clone(), - })?); + file_ext.push_str( + t.extension() + .context(TableExtensionSnafu { + table_io_source: t.io_source.clone(), + })? + .into(), + ); debug!("building file list from path {fs_path}..."); let files = build_file_list(&fs_path, &file_ext).context(FileListSnafu { fs_path, file_ext })?; diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index c230fc2..a234c34 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -93,6 +93,74 @@ pub enum Error { }, } +#[derive(Debug)] +pub enum Extension { + None, + Csv, + Json, + NdJson, + Jsonl, + Parquet, + Arrow, + Arrows, + Xls, + Xlsx, + Xlsb, + Ods, + Sqlite, + Mysql, + Postgresql, +} + +impl From for &'static str { + fn from(ext: Extension) -> Self { + match ext { + Extension::None => "", + Extension::Csv => "csv", + Extension::Json => "json", + Extension::NdJson => "ndjson", + Extension::Jsonl => "jsonl", + Extension::Parquet => "parquet", + Extension::Arrow => "arrow", + Extension::Arrows => "arrows", + Extension::Xls => "xls", + Extension::Xlsx => "xlsx", + Extension::Xlsb => "xlsb", + Extension::Ods => "ods", + Extension::Sqlite => "sqlite", + Extension::Mysql => "mysql", + Extension::Postgresql => "postgresql", + } + } +} + +impl TryFrom<&str> for Extension { + type Error = Error; + + fn try_from(ext: &str) -> Result { + Ok(match ext { + "" => Extension::None, + "csv" => Extension::Csv, + "json" => Extension::Json, + "ndjson" => Extension::NdJson, + "jsonl" => Extension::Jsonl, + "parquet" => Extension::Parquet, + "arrow" => Extension::Arrow, + "arrows" => Extension::Arrows, + "xls" => Extension::Xls, + "xlsx" => Extension::Xlsx, + "xlsb" => Extension::Xlsb, + "ods" => Extension::Ods, + "sqlite" | "sqlite3" | "db" => Extension::Sqlite, + _ => { + return Err(Error::Extension { + msg: format!("unsupported extension {ext}"), + }); + } + }) + } +} + #[derive(Deserialize, Clone, Debug, Eq, PartialEq)] #[serde(deny_unknown_fields)] pub struct TableColumn { @@ -372,23 +440,23 @@ impl TableLoadOption { } } - pub fn extension(&self) -> &'static str { + pub fn extension(&self) -> Extension { match self { - Self::json { .. } => "json", - Self::ndjson { .. } => "ndjson", - Self::jsonl { .. } => "jsonl", - Self::csv { .. } => "csv", - Self::parquet { .. } => "parquet", - Self::google_spreadsheet(_) | Self::delta { .. } => "", - Self::xls { .. } => "xls", - Self::xlsx { .. } => "xlsx", - Self::ods { .. } => "ods", - Self::xlsb { .. } => "xlsb", - Self::arrow { .. } => "arrow", - Self::arrows { .. } => "arrows", - Self::mysql { .. } => "mysql", - Self::sqlite { .. } => "sqlite", - Self::postgres { .. } => "postgres", + Self::json { .. } => Extension::Json, + Self::ndjson { .. } => Extension::NdJson, + Self::jsonl { .. } => Extension::Jsonl, + Self::csv { .. } => Extension::Csv, + Self::parquet { .. } => Extension::Parquet, + Self::google_spreadsheet(_) | Self::delta { .. } => Extension::None, + Self::xls { .. } => Extension::Xls, + Self::xlsx { .. } => Extension::Xlsx, + Self::ods { .. } => Extension::Ods, + Self::xlsb { .. } => Extension::Xlsb, + Self::arrow { .. } => Extension::Arrow, + Self::arrows { .. } => Extension::Arrows, + Self::mysql { .. } => Extension::Mysql, + Self::sqlite { .. } => Extension::Sqlite, + Self::postgres { .. } => Extension::Postgresql, } } } @@ -594,27 +662,21 @@ impl TableSource { } } - pub fn extension(&self) -> Result<&str, Error> { + pub fn extension(&self) -> Result { Ok(match (&self.option, &self.io_source) { (Some(opt), _) => opt.extension(), (None, TableIoSource::Uri(uri)) => { - match Path::new(uri).extension().and_then(OsStr::to_str) { - Some(ext) => match ext { - "csv" | "json" | "ndjson" | "jsonl" | "parquet" | "arrow" | "arrows" - | "xls" | "xlsx" | "xlsb" | "ods" => ext, - "sqlite" | "sqlite3" | "db" => "sqlite", - _ => { - return Err(Error::Extension { - msg: format!("unsupported extension {ext} in uri: {uri}"), - }); - } - }, + let uri_str = Path::new(uri).extension().and_then(OsStr::to_str); + match uri_str { + Some(ext) => ext.try_into().map_err(|_| Error::Extension { + msg: format!("unsupported extension {ext} in uri: {uri}"), + })?, None => { // database sources doesn't have suffix extension, parse scheme instead match TableSource::parse_option(&self.io_source) { - Some(TableLoadOption::mysql { .. }) => "mysql", - Some(TableLoadOption::sqlite { .. }) => "sqlite", - Some(TableLoadOption::postgres { .. }) => "postgres", + Some(TableLoadOption::mysql { .. }) => Extension::Mysql, + Some(TableLoadOption::sqlite { .. }) => Extension::Sqlite, + Some(TableLoadOption::postgres { .. }) => Extension::Postgresql, _ => { return Err(Error::Extension { msg: format!("unsupported extension in uri: {uri}"), @@ -752,25 +814,29 @@ pub async fn load( }) } else { match t.extension()? { - "csv" => csv::to_loaded_table(t.clone(), dfctx.clone()).await, - "json" => json::to_loaded_table(t.clone(), dfctx.clone()).await, - "ndjson" | "jsonl" => ndjson::to_loaded_table(t.clone(), dfctx.clone()).await, - "parquet" => parquet::to_loaded_table(t.clone(), dfctx.clone()).await, - "xls" | "xlsx" | "xlsb" | "ods" => excel::to_loaded_table(t.clone()).await, - "arrow" => arrow_ipc_file::to_loaded_table(t.clone(), dfctx.clone()).await, - "arrows" => arrow_ipc_stream::to_loaded_table(t.clone(), dfctx.clone()).await, - "mysql" => Ok(LoadedTable::new_from_df_table(Arc::new( + Extension::Csv => csv::to_loaded_table(t.clone(), dfctx.clone()).await, + Extension::Json => json::to_loaded_table(t.clone(), dfctx.clone()).await, + Extension::NdJson | Extension::Jsonl => { + ndjson::to_loaded_table(t.clone(), dfctx.clone()).await + } + Extension::Parquet => parquet::to_loaded_table(t.clone(), dfctx.clone()).await, + Extension::Xls | Extension::Xlsx | Extension::Xlsb | Extension::Ods => { + excel::to_loaded_table(t.clone()).await + } + Extension::Arrow => arrow_ipc_file::to_loaded_table(t.clone(), dfctx.clone()).await, + Extension::Arrows => arrow_ipc_stream::to_loaded_table(t.clone(), dfctx.clone()).await, + Extension::Mysql => Ok(LoadedTable::new_from_df_table(Arc::new( database::DatabaseLoader::MySQL.to_mem_table(t)?, ))), - "sqlite" => Ok(LoadedTable::new_from_df_table(Arc::new( + Extension::Sqlite => Ok(LoadedTable::new_from_df_table(Arc::new( database::DatabaseLoader::SQLite.to_mem_table(t)?, ))), - "postgresql" => Ok(LoadedTable::new_from_df_table(Arc::new( + Extension::Postgresql => Ok(LoadedTable::new_from_df_table(Arc::new( database::DatabaseLoader::Postgres.to_mem_table(t)?, ))), ext => Err(Error::InvalidUri { msg: format!( - "failed to register `{}` as table `{}`, unsupported table format `{}`", + "failed to register `{}` as table `{}`, unsupported table format `{:?}`", t.io_source, t.name, ext, ), }), diff --git a/roapi/src/api/drop.rs b/roapi/src/api/drop.rs index ba4ef20..f457f86 100644 --- a/roapi/src/api/drop.rs +++ b/roapi/src/api/drop.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, sync::Arc}; use axum::extract::{Extension, Json}; -use columnq::error::ColumnQError; use columnq::table::TableSource; use log::info; use serde::Deserialize; @@ -25,10 +24,7 @@ pub async fn drop_table( for config in body { if let Some(t) = tables.get(&config.table_name) { info!("dropping table `{}`", t.name); - ctx.drop_table(t) - .await - .map_err(ColumnQError::from) - .map_err(ApiErrResp::drop_table)?; + ctx.drop_table(t).await.map_err(ApiErrResp::drop_table)?; tables.remove(&config.table_name); info!("dropped table `{}`", config.table_name); } else { diff --git a/roapi/src/api/register.rs b/roapi/src/api/register.rs index cf4629a..bc173d4 100644 --- a/roapi/src/api/register.rs +++ b/roapi/src/api/register.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use axum::extract::{Extension, Json}; -use columnq::{error::ColumnQError, table::TableSource}; +use columnq::table::TableSource; use log::info; use serde::Deserialize; use tokio::sync::Mutex; @@ -26,10 +26,7 @@ pub async fn register_table( if let Some(ref uri) = config.uri { let t = TableSource::new_with_uri(&config.table_name, uri); info!("loading `{}` as table `{}`", t.io_source, config.table_name); - ctx.load_table(&t) - .await - .map_err(ColumnQError::from) - .map_err(ApiErrResp::load_table)?; + ctx.load_table(&t).await.map_err(ApiErrResp::load_table)?; tables.insert(config.table_name.clone(), t.clone()); info!( "registered `{}` as table `{}`", @@ -37,10 +34,7 @@ pub async fn register_table( ); } else if let Some(t) = tables.get(&config.table_name) { info!("Re register table {}", t.name); - ctx.load_table(t) - .await - .map_err(ColumnQError::from) - .map_err(ApiErrResp::load_table)?; + ctx.load_table(t).await.map_err(ApiErrResp::load_table)?; } else { return Err(ApiErrResp::register_table(format!( "Table `{}` source not exists",