diff --git a/columnq/src/table/database.rs b/columnq/src/table/database.rs index 5fbe6a6..cf21d07 100644 --- a/columnq/src/table/database.rs +++ b/columnq/src/table/database.rs @@ -10,14 +10,11 @@ pub enum DatabaseLoader { feature = "database-postgres" ))] mod imp { + use std::convert::TryFrom; + use crate::error::ColumnQError; use crate::table::TableSource; use connectorx::prelude::*; - #[cfg(any(feature = "database-mysql"))] - use connectorx::sources::mysql; - #[cfg(any(feature = "database-postgres"))] - use connectorx::sources::postgres; - use datafusion::arrow::record_batch::RecordBatch; use log::debug; use super::DatabaseLoader; @@ -28,125 +25,15 @@ mod imp { t: &TableSource, ) -> Result { debug!("loading database table data..."); - let queries = &[format!("SELECT * FROM {}", t.name)]; - let mut destination = ArrowDestination::new(); - match self { - DatabaseLoader::MySQL => { - #[cfg(feature = "database-mysql")] - { - let source = MySQLSource::::new(t.get_uri_str(), 2) - .map_err(|e| ColumnQError::Database(e.to_string()))?; - let dispatcher = Dispatcher::< - MySQLSource, - ArrowDestination, - MySQLArrowTransport, - >::new( - source, &mut destination, queries, None - ); - dispatcher - .run() - .map_err(|e| ColumnQError::Database(e.to_string()))?; - let schema_ref = destination.arrow_schema(); - let data: Vec = destination.arrow().unwrap(); - Ok(datafusion::datasource::MemTable::try_new( - schema_ref, - vec![data], - )?) - } - #[cfg(not(feature = "database-mysql"))] - { - return Err(ColumnQError::Database( - "MySQL database feature not enabled.".to_string(), - )); - } - } - DatabaseLoader::SQLite => { - #[cfg(feature = "database-sqlite")] - { - let uri = t.get_uri_str().replace("sqlite://", ""); - let source = SQLiteSource::new(&uri, 2) - .map_err(|e| ColumnQError::Database(e.to_string()))?; - let dispatcher = Dispatcher::< - SQLiteSource, - ArrowDestination, - SQLiteArrowTransport, - >::new( - source, &mut destination, queries, None - ); - dispatcher - .run() - .map_err(|e| ColumnQError::Database(e.to_string()))?; - - let schema_ref = destination.arrow_schema(); - let data: Vec = destination.arrow().unwrap(); - Ok(datafusion::datasource::MemTable::try_new( - schema_ref, - vec![data], - )?) - } - #[cfg(not(feature = "database-sqlite"))] - { - return Err(ColumnQError::Database( - "SQLite database feature not enabled.".to_string(), - )); - } - } - DatabaseLoader::Postgres => { - #[cfg(feature = "database-postgres")] - { - let config = t - .get_uri_str() - .parse::() - .map_err(|e| ColumnQError::Database(e.to_string()))?; - let tls = match config.get_ssl_mode() { - tokio_postgres::config::SslMode::Require => tokio_postgres::NoTls, - _ => tokio_postgres::NoTls, - }; - let source: PostgresSource< - postgres::BinaryProtocol, - tokio_postgres::NoTls, - > = PostgresSource::new(config.into(), tls, 2) - .map_err(|e| ColumnQError::Database(e.to_string()))?; - let queries = queries.clone(); - let task = tokio::task::spawn_blocking(move || { - let dispatcher = Dispatcher::< - PostgresSource, - ArrowDestination, - PostgresArrowTransport< - postgres::BinaryProtocol, - tokio_postgres::NoTls, - >, - >::new( - source, &mut destination, &queries, None - ); - - if let Err(e) = dispatcher.run() { - return Err(ColumnQError::Database(e.to_string())); - } - - let schema_ref = destination.arrow_schema(); - match destination.arrow() { - Ok(data) => datafusion::datasource::MemTable::try_new( - schema_ref, - vec![data], - ) - .map_err(|e| ColumnQError::Database(e.to_string())), - Err(e) => Err(ColumnQError::Database(e.to_string())), - } - }); - - // FIXME: Maybe use other way to block the async task instead of block_on - futures::executor::block_on(task) - .map_err(|e| ColumnQError::Database(e.to_string()))? - } - #[cfg(not(feature = "database-postgres"))] - { - return Err(ColumnQError::Database( - "Postgres database feature not enabled.".to_string(), - )); - } - } - } + let queries = CXQuery::naked(format!("SELECT * FROM {}", t.name)); + let source = SourceConn::try_from(t.get_uri_str()) + .map_err(|e| ColumnQError::Database(e.to_string()))?; + let destination = connectorx::get_arrow::get_arrow(&source, None, &[queries]) + .map_err(|e| ColumnQError::Database(e.to_string()))?; + Ok(datafusion::datasource::MemTable::try_new( + destination.arrow_schema(), + vec![destination.arrow().unwrap()], + )?) } } }