diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index db6d5a5..95fec0c 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -412,23 +412,29 @@ impl TableSource { Ok(match (&self.option, &self.io_source) { (Some(opt), _) => opt.extension(), (None, TableIoSource::Uri(uri)) => { - let ext = Path::new(uri) - .extension() - .and_then(OsStr::to_str) - .ok_or_else(|| { - ColumnQError::InvalidUri(format!( - "cannot detect table extension from uri: {}", - uri - )) - })?; - - match ext { - "csv" | "json" | "parquet" | "ndjson" | "arrow" | "arrows" => ext, - _ => { - return Err(ColumnQError::InvalidUri(format!( - "unsupported extension in uri: {}", - uri - ))); + match Path::new(uri).extension().and_then(OsStr::to_str) { + Some(ext) => match ext { + "csv" | "json" | "parquet" | "ndjson" | "arrow" | "arrows" => ext, + _ => { + return Err(ColumnQError::InvalidUri(format!( + "unsupported extension in uri: {}", + uri + ))); + } + }, + None => { + // database sources doesn't have suffix extension, parse scheme instead + match TableSource::parse_option(&self.io_source) { + Some(TableLoadOption::mysql {}) => "mysql", + Some(TableLoadOption::sqlite {}) => "sqlite", + Some(TableLoadOption::postgres {}) => "postgres", + _ => { + return Err(ColumnQError::InvalidUri(format!( + "unsupported extension in uri: {}", + uri + ))); + } + } } } } @@ -472,6 +478,9 @@ pub async fn load(t: &TableSource) -> Result, ColumnQErro "parquet" => parquet::to_datafusion_table(t).await?, "arrow" => Arc::new(arrow_ipc_file::to_mem_table(t).await?), "arrows" => Arc::new(arrow_ipc_stream::to_mem_table(t).await?), + "mysql" => Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?), + "sqlite" => Arc::new(database::DatabaseLoader::SQLite.to_mem_table(t)?), + "postgresql" => Arc::new(database::DatabaseLoader::Postgres.to_mem_table(t)?), ext => { return Err(ColumnQError::InvalidUri(format!( "failed to register `{}` as table `{}`, unsupported table format `{}`",