diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7afdd89..32561c6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -289,7 +289,7 @@ jobs: # set SDKROOT for C dependencies export SDKROOT=$(xcrun --sdk macosx --show-sdk-path) cd roapi && \ - cargo build --bin roapi --features database --target aarch64-apple-darwin + cargo build --bin roapi --features database-sqlite,database-mysql --target aarch64-apple-darwin - name: Trim cache run: | which cargo-cache || cargo install cargo-cache diff --git a/Cargo.lock b/Cargo.lock index 05ab66f..f4973e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1207,6 +1207,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-postgres", "toml 0.7.4", "uriparse", "url", @@ -1271,24 +1272,34 @@ dependencies = [ "anyhow", "arrow", "chrono", + "csv", "fallible-streaming-iterator", "fehler", + "hex", "itertools", "log", "mysql_common", + "native-tls", "num-traits", + "openssl", "owning_ref", + "postgres", + "postgres-native-tls", + "postgres-openssl", "r2d2", "r2d2_mysql", + "r2d2_postgres", "r2d2_sqlite", "rayon", "rusqlite", "rust_decimal", + "rust_decimal_macros", "serde_json", "sqlparser 0.11.0", "thiserror", "url", "urlencoding", + "uuid 0.8.2", ] [[package]] @@ -1574,7 +1585,7 @@ dependencies = [ "tokio-stream", "tokio-util", "url", - "uuid", + "uuid 1.3.3", "xz2", "zstd 0.12.3+zstd.1.5.2", ] @@ -1672,7 +1683,7 @@ dependencies = [ "regex", "sha2 0.10.6", "unicode-segmentation", - "uuid", + "uuid 1.3.3", ] [[package]] @@ -1759,7 +1770,7 @@ dependencies = [ "thiserror", "tokio", "url", - "uuid", + "uuid 1.3.3", ] [[package]] @@ -1865,7 +1876,7 @@ dependencies = [ "rusoto_dynamodb", "thiserror", "tokio", - "uuid", + "uuid 1.3.3", ] [[package]] @@ -2998,7 +3009,7 @@ dependencies = [ "subprocess", "thiserror", "time 0.3.22", - "uuid", + "uuid 1.3.3", ] [[package]] @@ -3517,6 +3528,32 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "postgres-native-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d442770e2b1e244bb5eb03b31c79b65bb2568f413b899eaba850fa945a65954" +dependencies = [ + "futures", + "native-tls", + "tokio", + "tokio-native-tls", + "tokio-postgres", +] + +[[package]] +name = "postgres-openssl" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de0ea6504e07ca78355a6fb88ad0f36cafe9e696cbc6717f16a207f3a60be72" +dependencies = [ + "futures", + "openssl", + "tokio", + "tokio-openssl", + "tokio-postgres", +] + [[package]] name = "postgres-protocol" version = "0.6.5" @@ -3542,8 +3579,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", + "serde", + "serde_json", + "uuid 0.8.2", ] [[package]] @@ -3681,6 +3722,16 @@ dependencies = [ "r2d2", ] +[[package]] +name = "r2d2_postgres" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7029c56be658cb54f321e0bee597810ee16796b735fa2559d7056bf06b12230b" +dependencies = [ + "postgres", + "r2d2", +] + [[package]] name = "r2d2_sqlite" version = "0.20.0" @@ -3893,7 +3944,7 @@ dependencies = [ "rkyv_derive", "seahash", "tinyvec", - "uuid", + "uuid 1.3.3", ] [[package]] @@ -4071,6 +4122,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rust_decimal_macros" +version = "1.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ca5c398d85f83b9a44de754a2048625a8c5eafcf070da7b8f116b685e2f6608" +dependencies = [ + "quote", + "rust_decimal", +] + [[package]] name = "rustc-hash" version = "1.1.0" @@ -4844,6 +4905,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-openssl" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08f9ffb7809f1b20c1b398d92acf4cc719874b3b2b2d9ea2f09b4a80350878a" +dependencies = [ + "futures-util", + "openssl", + "openssl-sys", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.8" @@ -5170,6 +5243,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" + [[package]] name = "uuid" version = "1.3.3" diff --git a/README.md b/README.md index f48a526..9903a0a 100644 --- a/README.md +++ b/README.md @@ -310,7 +310,7 @@ Data layer: - [x] Google spreadsheet - [x] MySQL - [x] SQLite -- [ ] Postgres +- [x] Postgres - [ ] Airtable - Data format - [x] CSV diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 1e05027..edee6fa 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -13,7 +13,7 @@ path = "src/lib.rs" [dependencies] # pulling arrow-schema manually to enable the serde feature. # TODO: add serde feature in datafusion to avoid this workaround -arrow-schema = { version ="37.0.0", features = ["serde"] } +arrow-schema = { version = "37.0.0", features = ["serde"] } datafusion = "23" object_store = { version = "0.5.6", features = ["aws_profile", "gcp", "azure"] } @@ -24,7 +24,7 @@ log = "0" regex = "1" lazy_static = "1" graphql-parser = "0" -sqlparser = "0.33" # version need to be in sync with convergence +sqlparser = "0.33" # version need to be in sync with convergence yup-oauth2 = { version = "6.2", default-features = false, features = [ "service_account", ] } @@ -44,6 +44,7 @@ tokio = { version = "1", features = ["rt-multi-thread"] } futures = "0.3" hyper-tls = { version = "0.5.0", default-features = false, optional = true } hyper-rustls = { version = "0.23.2", default-features = false, optional = true } +tokio-postgres = { version = "0.7.8", optional = true } [dependencies.deltalake] default-features = false @@ -92,7 +93,9 @@ native-tls = [ simd = ["datafusion/simd"] database-sqlite = ["connectorx/src_sqlite"] database-mysql = ["connectorx/src_mysql"] +database-postgres = ["connectorx/src_postgres", "dep:tokio-postgres"] database = [ "database-sqlite", "database-mysql", + "database-postgres" ] diff --git a/columnq/src/table/database.rs b/columnq/src/table/database.rs index 7f34c4b..5fbe6a6 100644 --- a/columnq/src/table/database.rs +++ b/columnq/src/table/database.rs @@ -4,13 +4,19 @@ pub enum DatabaseLoader { Postgres, } -#[cfg(any(feature = "database-sqlite", feature = "database-mysql"))] +#[cfg(any( + feature = "database-sqlite", + feature = "database-mysql", + feature = "database-postgres" +))] mod imp { use crate::error::ColumnQError; use crate::table::TableSource; use connectorx::prelude::*; - #[cfg(feature = "database-mysql")] - use connectorx::sources::mysql::BinaryProtocol; + #[cfg(any(feature = "database-mysql"))] + use connectorx::sources::mysql; + #[cfg(any(feature = "database-postgres"))] + use connectorx::sources::postgres; use datafusion::arrow::record_batch::RecordBatch; use log::debug; @@ -28,18 +34,24 @@ mod imp { DatabaseLoader::MySQL => { #[cfg(feature = "database-mysql")] { - let source = MySQLSource::::new(t.get_uri_str(), 2) + let source = MySQLSource::::new(t.get_uri_str(), 2) .map_err(|e| ColumnQError::Database(e.to_string()))?; let dispatcher = Dispatcher::< - MySQLSource, + MySQLSource, ArrowDestination, - MySQLArrowTransport, + MySQLArrowTransport, >::new( source, &mut destination, queries, None ); dispatcher .run() .map_err(|e| ColumnQError::Database(e.to_string()))?; + let schema_ref = destination.arrow_schema(); + let data: Vec = destination.arrow().unwrap(); + Ok(datafusion::datasource::MemTable::try_new( + schema_ref, + vec![data], + )?) } #[cfg(not(feature = "database-mysql"))] { @@ -64,6 +76,13 @@ mod imp { dispatcher .run() .map_err(|e| ColumnQError::Database(e.to_string()))?; + + let schema_ref = destination.arrow_schema(); + let data: Vec = destination.arrow().unwrap(); + Ok(datafusion::datasource::MemTable::try_new( + schema_ref, + vec![data], + )?) } #[cfg(not(feature = "database-sqlite"))] { @@ -73,23 +92,70 @@ mod imp { } } DatabaseLoader::Postgres => { - // ToDo `Cannot start a runtime from within a runtime` error in `connector-x PostgresSource` - return Err(ColumnQError::Database( - "Postgres database features not be supported for now.".to_string(), - )); + #[cfg(feature = "database-postgres")] + { + let config = t + .get_uri_str() + .parse::() + .map_err(|e| ColumnQError::Database(e.to_string()))?; + let tls = match config.get_ssl_mode() { + tokio_postgres::config::SslMode::Require => tokio_postgres::NoTls, + _ => tokio_postgres::NoTls, + }; + let source: PostgresSource< + postgres::BinaryProtocol, + tokio_postgres::NoTls, + > = PostgresSource::new(config.into(), tls, 2) + .map_err(|e| ColumnQError::Database(e.to_string()))?; + let queries = queries.clone(); + let task = tokio::task::spawn_blocking(move || { + let dispatcher = Dispatcher::< + PostgresSource, + ArrowDestination, + PostgresArrowTransport< + postgres::BinaryProtocol, + tokio_postgres::NoTls, + >, + >::new( + source, &mut destination, &queries, None + ); + + if let Err(e) = dispatcher.run() { + return Err(ColumnQError::Database(e.to_string())); + } + + let schema_ref = destination.arrow_schema(); + match destination.arrow() { + Ok(data) => datafusion::datasource::MemTable::try_new( + schema_ref, + vec![data], + ) + .map_err(|e| ColumnQError::Database(e.to_string())), + Err(e) => Err(ColumnQError::Database(e.to_string())), + } + }); + + // FIXME: Maybe use other way to block the async task instead of block_on + futures::executor::block_on(task) + .map_err(|e| ColumnQError::Database(e.to_string()))? + } + #[cfg(not(feature = "database-postgres"))] + { + return Err(ColumnQError::Database( + "Postgres database feature not enabled.".to_string(), + )); + } } - }; - let schema_ref = destination.arrow_schema(); - let data: Vec = destination.arrow().unwrap(); - Ok(datafusion::datasource::MemTable::try_new( - schema_ref, - vec![data], - )?) + } } } } -#[cfg(not(any(feature = "database-sqlite", feature = "database-mysql")))] +#[cfg(not(any( + feature = "database-sqlite", + feature = "database-mysql", + feature = "database-postgres" +)))] mod imp { use crate::error::ColumnQError; use crate::table::TableSource; diff --git a/columnq/tests/table_postgres_test.rs b/columnq/tests/table_postgres_test.rs new file mode 100644 index 0000000..357b5a6 --- /dev/null +++ b/columnq/tests/table_postgres_test.rs @@ -0,0 +1,24 @@ +#[cfg(any(feature = "database-postgres"))] +mod postgres { + use datafusion::datasource::TableProvider; + use datafusion::prelude::SessionContext; + use std::env; + + use columnq::table::TableSource; + + use columnq::table::database::DatabaseLoader; + + #[tokio::test] + async fn load_postgres() -> anyhow::Result<()> { + dotenvy::dotenv().ok(); + if let Ok(name) = env::var("TABLE_NAME") { + let t = DatabaseLoader::Postgres + .to_mem_table(&TableSource::new(name, env::var("POSGRES_URL")?))?; + let ctx = SessionContext::new(); + let stats = t.scan(&ctx.state(), None, &[], None).await?.statistics(); + assert!(stats.num_rows.is_some()); + } + + Ok(()) + } +}