refactor read table from db code (#293)

This commit is contained in:
Joe 2023-09-11 14:42:28 +08:00 committed by GitHub
parent ff3617767d
commit 5348f68d6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -10,14 +10,11 @@ pub enum DatabaseLoader {
feature = "database-postgres"
))]
mod imp {
use std::convert::TryFrom;
use crate::error::ColumnQError;
use crate::table::TableSource;
use connectorx::prelude::*;
#[cfg(any(feature = "database-mysql"))]
use connectorx::sources::mysql;
#[cfg(any(feature = "database-postgres"))]
use connectorx::sources::postgres;
use datafusion::arrow::record_batch::RecordBatch;
use log::debug;
use super::DatabaseLoader;
@ -28,125 +25,15 @@ mod imp {
t: &TableSource,
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
debug!("loading database table data...");
let queries = &[format!("SELECT * FROM {}", t.name)];
let mut destination = ArrowDestination::new();
match self {
DatabaseLoader::MySQL => {
#[cfg(feature = "database-mysql")]
{
let source = MySQLSource::<mysql::BinaryProtocol>::new(t.get_uri_str(), 2)
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let dispatcher = Dispatcher::<
MySQLSource<mysql::BinaryProtocol>,
ArrowDestination,
MySQLArrowTransport<mysql::BinaryProtocol>,
>::new(
source, &mut destination, queries, None
);
dispatcher
.run()
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let schema_ref = destination.arrow_schema();
let data: Vec<RecordBatch> = destination.arrow().unwrap();
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)?)
}
#[cfg(not(feature = "database-mysql"))]
{
return Err(ColumnQError::Database(
"MySQL database feature not enabled.".to_string(),
));
}
}
DatabaseLoader::SQLite => {
#[cfg(feature = "database-sqlite")]
{
let uri = t.get_uri_str().replace("sqlite://", "");
let source = SQLiteSource::new(&uri, 2)
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let dispatcher = Dispatcher::<
SQLiteSource,
ArrowDestination,
SQLiteArrowTransport,
>::new(
source, &mut destination, queries, None
);
dispatcher
.run()
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let schema_ref = destination.arrow_schema();
let data: Vec<RecordBatch> = destination.arrow().unwrap();
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)?)
}
#[cfg(not(feature = "database-sqlite"))]
{
return Err(ColumnQError::Database(
"SQLite database feature not enabled.".to_string(),
));
}
}
DatabaseLoader::Postgres => {
#[cfg(feature = "database-postgres")]
{
let config = t
.get_uri_str()
.parse::<tokio_postgres::Config>()
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let tls = match config.get_ssl_mode() {
tokio_postgres::config::SslMode::Require => tokio_postgres::NoTls,
_ => tokio_postgres::NoTls,
};
let source: PostgresSource<
postgres::BinaryProtocol,
tokio_postgres::NoTls,
> = PostgresSource::new(config.into(), tls, 2)
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let queries = queries.clone();
let task = tokio::task::spawn_blocking(move || {
let dispatcher = Dispatcher::<
PostgresSource<postgres::BinaryProtocol, tokio_postgres::NoTls>,
ArrowDestination,
PostgresArrowTransport<
postgres::BinaryProtocol,
tokio_postgres::NoTls,
>,
>::new(
source, &mut destination, &queries, None
);
if let Err(e) = dispatcher.run() {
return Err(ColumnQError::Database(e.to_string()));
}
let schema_ref = destination.arrow_schema();
match destination.arrow() {
Ok(data) => datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)
.map_err(|e| ColumnQError::Database(e.to_string())),
Err(e) => Err(ColumnQError::Database(e.to_string())),
}
});
// FIXME: Maybe use other way to block the async task instead of block_on
futures::executor::block_on(task)
.map_err(|e| ColumnQError::Database(e.to_string()))?
}
#[cfg(not(feature = "database-postgres"))]
{
return Err(ColumnQError::Database(
"Postgres database feature not enabled.".to_string(),
));
}
}
}
let queries = CXQuery::naked(format!("SELECT * FROM {}", t.name));
let source = SourceConn::try_from(t.get_uri_str())
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let destination = connectorx::get_arrow::get_arrow(&source, None, &[queries])
.map_err(|e| ColumnQError::Database(e.to_string()))?;
Ok(datafusion::datasource::MemTable::try_new(
destination.arrow_schema(),
vec![destination.arrow().unwrap()],
)?)
}
}
}