feat: support postgresql (#286)

This commit is contained in:
Joe 2023-08-30 00:06:11 +08:00 committed by GitHub
parent b8b772a018
commit ff3617767d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 200 additions and 28 deletions

View File

@ -289,7 +289,7 @@ jobs:
# set SDKROOT for C dependencies
export SDKROOT=$(xcrun --sdk macosx --show-sdk-path)
cd roapi && \
cargo build --bin roapi --features database --target aarch64-apple-darwin
cargo build --bin roapi --features database-sqlite,database-mysql --target aarch64-apple-darwin
- name: Trim cache
run: |
which cargo-cache || cargo install cargo-cache

91
Cargo.lock generated
View File

@ -1207,6 +1207,7 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-postgres",
"toml 0.7.4",
"uriparse",
"url",
@ -1271,24 +1272,34 @@ dependencies = [
"anyhow",
"arrow",
"chrono",
"csv",
"fallible-streaming-iterator",
"fehler",
"hex",
"itertools",
"log",
"mysql_common",
"native-tls",
"num-traits",
"openssl",
"owning_ref",
"postgres",
"postgres-native-tls",
"postgres-openssl",
"r2d2",
"r2d2_mysql",
"r2d2_postgres",
"r2d2_sqlite",
"rayon",
"rusqlite",
"rust_decimal",
"rust_decimal_macros",
"serde_json",
"sqlparser 0.11.0",
"thiserror",
"url",
"urlencoding",
"uuid 0.8.2",
]
[[package]]
@ -1574,7 +1585,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"url",
"uuid",
"uuid 1.3.3",
"xz2",
"zstd 0.12.3+zstd.1.5.2",
]
@ -1672,7 +1683,7 @@ dependencies = [
"regex",
"sha2 0.10.6",
"unicode-segmentation",
"uuid",
"uuid 1.3.3",
]
[[package]]
@ -1759,7 +1770,7 @@ dependencies = [
"thiserror",
"tokio",
"url",
"uuid",
"uuid 1.3.3",
]
[[package]]
@ -1865,7 +1876,7 @@ dependencies = [
"rusoto_dynamodb",
"thiserror",
"tokio",
"uuid",
"uuid 1.3.3",
]
[[package]]
@ -2998,7 +3009,7 @@ dependencies = [
"subprocess",
"thiserror",
"time 0.3.22",
"uuid",
"uuid 1.3.3",
]
[[package]]
@ -3517,6 +3528,32 @@ dependencies = [
"tokio-postgres",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d442770e2b1e244bb5eb03b31c79b65bb2568f413b899eaba850fa945a65954"
dependencies = [
"futures",
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-openssl"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1de0ea6504e07ca78355a6fb88ad0f36cafe9e696cbc6717f16a207f3a60be72"
dependencies = [
"futures",
"openssl",
"tokio",
"tokio-openssl",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.5"
@ -3542,8 +3579,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6"
dependencies = [
"bytes",
"chrono",
"fallible-iterator",
"postgres-protocol",
"serde",
"serde_json",
"uuid 0.8.2",
]
[[package]]
@ -3681,6 +3722,16 @@ dependencies = [
"r2d2",
]
[[package]]
name = "r2d2_postgres"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7029c56be658cb54f321e0bee597810ee16796b735fa2559d7056bf06b12230b"
dependencies = [
"postgres",
"r2d2",
]
[[package]]
name = "r2d2_sqlite"
version = "0.20.0"
@ -3893,7 +3944,7 @@ dependencies = [
"rkyv_derive",
"seahash",
"tinyvec",
"uuid",
"uuid 1.3.3",
]
[[package]]
@ -4071,6 +4122,16 @@ dependencies = [
"serde_json",
]
[[package]]
name = "rust_decimal_macros"
version = "1.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ca5c398d85f83b9a44de754a2048625a8c5eafcf070da7b8f116b685e2f6608"
dependencies = [
"quote",
"rust_decimal",
]
[[package]]
name = "rustc-hash"
version = "1.1.0"
@ -4844,6 +4905,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-openssl"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08f9ffb7809f1b20c1b398d92acf4cc719874b3b2b2d9ea2f09b4a80350878a"
dependencies = [
"futures-util",
"openssl",
"openssl-sys",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.8"
@ -5170,6 +5243,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
[[package]]
name = "uuid"
version = "1.3.3"

View File

@ -310,7 +310,7 @@ Data layer:
- [x] Google spreadsheet
- [x] MySQL
- [x] SQLite
- [ ] Postgres
- [x] Postgres
- [ ] Airtable
- Data format
- [x] CSV

View File

@ -13,7 +13,7 @@ path = "src/lib.rs"
[dependencies]
# pulling arrow-schema manually to enable the serde feature.
# TODO: add serde feature in datafusion to avoid this workaround
arrow-schema = { version ="37.0.0", features = ["serde"] }
arrow-schema = { version = "37.0.0", features = ["serde"] }
datafusion = "23"
object_store = { version = "0.5.6", features = ["aws_profile", "gcp", "azure"] }
@ -24,7 +24,7 @@ log = "0"
regex = "1"
lazy_static = "1"
graphql-parser = "0"
sqlparser = "0.33" # version need to be in sync with convergence
sqlparser = "0.33" # version need to be in sync with convergence
yup-oauth2 = { version = "6.2", default-features = false, features = [
"service_account",
] }
@ -44,6 +44,7 @@ tokio = { version = "1", features = ["rt-multi-thread"] }
futures = "0.3"
hyper-tls = { version = "0.5.0", default-features = false, optional = true }
hyper-rustls = { version = "0.23.2", default-features = false, optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
[dependencies.deltalake]
default-features = false
@ -92,7 +93,9 @@ native-tls = [
simd = ["datafusion/simd"]
database-sqlite = ["connectorx/src_sqlite"]
database-mysql = ["connectorx/src_mysql"]
database-postgres = ["connectorx/src_postgres", "dep:tokio-postgres"]
database = [
"database-sqlite",
"database-mysql",
"database-postgres"
]

View File

@ -4,13 +4,19 @@ pub enum DatabaseLoader {
Postgres,
}
#[cfg(any(feature = "database-sqlite", feature = "database-mysql"))]
#[cfg(any(
feature = "database-sqlite",
feature = "database-mysql",
feature = "database-postgres"
))]
mod imp {
use crate::error::ColumnQError;
use crate::table::TableSource;
use connectorx::prelude::*;
#[cfg(feature = "database-mysql")]
use connectorx::sources::mysql::BinaryProtocol;
#[cfg(any(feature = "database-mysql"))]
use connectorx::sources::mysql;
#[cfg(any(feature = "database-postgres"))]
use connectorx::sources::postgres;
use datafusion::arrow::record_batch::RecordBatch;
use log::debug;
@ -28,18 +34,24 @@ mod imp {
DatabaseLoader::MySQL => {
#[cfg(feature = "database-mysql")]
{
let source = MySQLSource::<BinaryProtocol>::new(t.get_uri_str(), 2)
let source = MySQLSource::<mysql::BinaryProtocol>::new(t.get_uri_str(), 2)
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let dispatcher = Dispatcher::<
MySQLSource<BinaryProtocol>,
MySQLSource<mysql::BinaryProtocol>,
ArrowDestination,
MySQLArrowTransport<BinaryProtocol>,
MySQLArrowTransport<mysql::BinaryProtocol>,
>::new(
source, &mut destination, queries, None
);
dispatcher
.run()
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let schema_ref = destination.arrow_schema();
let data: Vec<RecordBatch> = destination.arrow().unwrap();
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)?)
}
#[cfg(not(feature = "database-mysql"))]
{
@ -64,6 +76,13 @@ mod imp {
dispatcher
.run()
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let schema_ref = destination.arrow_schema();
let data: Vec<RecordBatch> = destination.arrow().unwrap();
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)?)
}
#[cfg(not(feature = "database-sqlite"))]
{
@ -73,23 +92,70 @@ mod imp {
}
}
DatabaseLoader::Postgres => {
// ToDo `Cannot start a runtime from within a runtime` error in `connector-x PostgresSource`
return Err(ColumnQError::Database(
"Postgres database features not be supported for now.".to_string(),
));
#[cfg(feature = "database-postgres")]
{
let config = t
.get_uri_str()
.parse::<tokio_postgres::Config>()
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let tls = match config.get_ssl_mode() {
tokio_postgres::config::SslMode::Require => tokio_postgres::NoTls,
_ => tokio_postgres::NoTls,
};
let source: PostgresSource<
postgres::BinaryProtocol,
tokio_postgres::NoTls,
> = PostgresSource::new(config.into(), tls, 2)
.map_err(|e| ColumnQError::Database(e.to_string()))?;
let queries = queries.clone();
let task = tokio::task::spawn_blocking(move || {
let dispatcher = Dispatcher::<
PostgresSource<postgres::BinaryProtocol, tokio_postgres::NoTls>,
ArrowDestination,
PostgresArrowTransport<
postgres::BinaryProtocol,
tokio_postgres::NoTls,
>,
>::new(
source, &mut destination, &queries, None
);
if let Err(e) = dispatcher.run() {
return Err(ColumnQError::Database(e.to_string()));
}
let schema_ref = destination.arrow_schema();
match destination.arrow() {
Ok(data) => datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)
.map_err(|e| ColumnQError::Database(e.to_string())),
Err(e) => Err(ColumnQError::Database(e.to_string())),
}
});
// FIXME: Maybe use other way to block the async task instead of block_on
futures::executor::block_on(task)
.map_err(|e| ColumnQError::Database(e.to_string()))?
}
#[cfg(not(feature = "database-postgres"))]
{
return Err(ColumnQError::Database(
"Postgres database feature not enabled.".to_string(),
));
}
}
};
let schema_ref = destination.arrow_schema();
let data: Vec<RecordBatch> = destination.arrow().unwrap();
Ok(datafusion::datasource::MemTable::try_new(
schema_ref,
vec![data],
)?)
}
}
}
}
#[cfg(not(any(feature = "database-sqlite", feature = "database-mysql")))]
#[cfg(not(any(
feature = "database-sqlite",
feature = "database-mysql",
feature = "database-postgres"
)))]
mod imp {
use crate::error::ColumnQError;
use crate::table::TableSource;

View File

@ -0,0 +1,24 @@
#[cfg(any(feature = "database-postgres"))]
mod postgres {
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use std::env;
use columnq::table::TableSource;
use columnq::table::database::DatabaseLoader;
#[tokio::test]
async fn load_postgres() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
if let Ok(name) = env::var("TABLE_NAME") {
let t = DatabaseLoader::Postgres
.to_mem_table(&TableSource::new(name, env::var("POSGRES_URL")?))?;
let ctx = SessionContext::new();
let stats = t.scan(&ctx.state(), None, &[], None).await?.statistics();
assert!(stats.num_rows.is_some());
}
Ok(())
}
}