diff --git a/Cargo.lock b/Cargo.lock index bbcf9fe..c96b4cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -524,7 +524,7 @@ dependencies = [ [[package]] name = "columnq" -version = "0.4.5" +version = "0.4.6" dependencies = [ "anyhow", "bytes", @@ -610,7 +610,8 @@ dependencies = [ [[package]] name = "connectorx" version = "0.3.1-alpha.1" -source = "git+https://github.com/roapi/connector-x.git?branch=backport_main#d0cf0a7d1ca9c2826e6b9e5f8588c14ac975c593" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c5690db42d8f378b7e8fbeff38a645ef5adf165968de67de577f897df158252" dependencies = [ "anyhow", "arrow", @@ -2962,7 +2963,7 @@ dependencies = [ [[package]] name = "roapi" -version = "0.7.0" +version = "0.7.1" dependencies = [ "anyhow", "async-process", @@ -2972,7 +2973,7 @@ dependencies = [ "columnq", "convergence", "convergence-arrow", - "env_logger 0.8.4", + "env_logger 0.9.0", "hyper", "log", "pin-project", @@ -2985,6 +2986,7 @@ dependencies = [ "sqlparser 0.17.0", "thiserror", "tokio", + "tokio-postgres", "tower-http", "tower-layer", "tracing", diff --git a/README.md b/README.md index 35ac44f..7c9fc91 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ See below for a high level diagram: ```bash # if you are using homebrew brew install roapi -# or if you prefer python / pip +# or if you prefer pip pip install roapi ``` @@ -69,7 +69,7 @@ roapi \ Or using docker: ```bash -docker run -t --rm -p 8080:8080 ghcr.io/roapi/roapi:latest --addr 0.0.0.0:8080 \ +docker run -t --rm -p 8080:8080 ghcr.io/roapi/roapi:latest --addr-http 0.0.0.0:8080 \ --table "uk_cities=test_data/uk_cities_with_headers.csv" \ --table "test_data/spacex_launches.json" ``` @@ -123,7 +123,9 @@ You can also configure multiple table sources using YAML config, which supports advanced format specific table options: ```yaml -addr: 0.0.0.0:8084 +addr: + http: 0.0.0.0:8084 + postgres: 0.0.0.0:5433 tables: - name: "blogs" uri: "test_data/blogs.parquet" diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 4236c9d..e239a3f 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "columnq" -version = "0.4.5" +version = "0.4.6" homepage = "https://github.com/roapi/roapi" license = "MIT" authors = ["QP Hou "] @@ -49,8 +49,8 @@ default-features = false features = ["datafusion-ext"] [dependencies.connectorx] -git = "https://github.com/roapi/connector-x.git" -branch = "backport_main" +# git = "https://github.com/roapi/connector-x.git" +# branch = "backport_main" version = "0.3.1-alpha.1" features = ["default", "dst_arrow", "src_mysql", "src_sqlite"] optional = true diff --git a/roapi/Cargo.toml b/roapi/Cargo.toml index 0f13160..465ffbb 100644 --- a/roapi/Cargo.toml +++ b/roapi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "roapi" -version = "0.7.0" +version = "0.7.1" authors = ["QP Hou "] homepage = "https://github.com/roapi/roapi" license = "MIT" @@ -58,6 +58,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls", ] } async-process = "1.3.0" +tokio-postgres = "0.7" # TODO: uncomment this when we exclude roapi from root workspace # [profile.release] diff --git a/roapi/src/server/http/mod.rs b/roapi/src/server/http/mod.rs index deca246..094c9cf 100644 --- a/roapi/src/server/http/mod.rs +++ b/roapi/src/server/http/mod.rs @@ -21,7 +21,7 @@ pub fn build_http_server( tables: Arc>>, config: &Config, default_host: String, -) -> anyhow::Result<(HttpApiServer, u16)> { +) -> anyhow::Result<(HttpApiServer, std::net::SocketAddr)> { let default_http_port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string()); let default_http_addr = [default_host, default_http_port].join(":"); let http_addr = config @@ -46,12 +46,11 @@ pub fn build_http_server( } let listener = TcpListener::bind(http_addr)?; - let http_port = listener + let addr = listener .local_addr() - .expect("Failed to get address from listener") - .port(); + .expect("Failed to get address from listener"); let http_server = axum::Server::from_tcp(listener).unwrap(); let http_server = http_server.serve(app.into_make_service()); - Ok((http_server, http_port)) + Ok((http_server, addr)) } diff --git a/roapi/src/server/mod.rs b/roapi/src/server/mod.rs index e6ad706..92e967b 100644 --- a/roapi/src/server/mod.rs +++ b/roapi/src/server/mod.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; #[async_trait] pub trait RunnableServer: Send + Sync { - fn port(&self) -> u16; + fn addr(&self) -> std::net::SocketAddr; async fn run(&self) -> anyhow::Result<()>; } diff --git a/roapi/src/server/postgres.rs b/roapi/src/server/postgres.rs index ae51590..1b8a54a 100644 --- a/roapi/src/server/postgres.rs +++ b/roapi/src/server/postgres.rs @@ -1,3 +1,7 @@ +// Wire protocol reference: +// https://www.postgresql.org/docs/current/protocol-message-formats.html +// https://beta.pgcon.org/2014/schedule/attachments/330_postgres-for-the-wire.pdf + use async_trait::async_trait; use std::sync::Arc; @@ -7,6 +11,7 @@ use convergence::engine::{Engine, Portal}; use convergence::protocol::{ErrorResponse, FieldDescription, SqlState}; use convergence::protocol_ext::DataRowBatch; use convergence_arrow::table::{record_batch_to_rows, schema_to_field_desc}; +use log::info; use sqlparser::ast::Statement; use tokio::net::TcpListener; @@ -46,6 +51,7 @@ impl Engine for RoapiContextEngine { statement: &Statement, ) -> Result, ErrorResponse> { let query = statement.to_string(); + info!("preparing query: {}", &query); let df = self.ctx.sql_to_df(&query).await.map_err(df_err_to_sql)?; schema_to_field_desc(&df.schema().clone().into()) } @@ -62,7 +68,7 @@ impl Engine for RoapiContextEngine { pub struct PostgresServer { pub ctx: Arc, - pub port: u16, + pub addr: std::net::SocketAddr, pub listener: TcpListener, } @@ -81,10 +87,9 @@ impl PostgresServer { .expect("Failed to bind address for Postgres server"); Self { ctx, - port: listener + addr: listener .local_addr() - .expect("Failed to get address from listener") - .port(), + .expect("Failed to get address from listener"), listener, } } @@ -92,8 +97,8 @@ impl PostgresServer { #[async_trait] impl RunnableServer for PostgresServer { - fn port(&self) -> u16 { - self.port + fn addr(&self) -> std::net::SocketAddr { + self.addr } async fn run(&self) -> anyhow::Result<()> { diff --git a/roapi/src/startup.rs b/roapi/src/startup.rs index b3b6c7e..3c3f2f3 100644 --- a/roapi/src/startup.rs +++ b/roapi/src/startup.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use columnq::table::TableSource; +use log::info; use tokio::sync::{Mutex, RwLock}; use crate::config::Config; @@ -10,7 +11,7 @@ use crate::context::RawRoapiContext; use crate::server; pub struct Application { - http_port: u16, + http_addr: std::net::SocketAddr, http_server: server::http::HttpApiServer, postgres_server: Box, } @@ -40,7 +41,7 @@ impl Application { ) .await, ); - let (http_server, http_port) = server::http::build_http_server::( + let (http_server, http_addr) = server::http::build_http_server::( ctx_ext, tables, &config, @@ -48,7 +49,7 @@ impl Application { )?; Ok(Self { - http_port, + http_addr, http_server, postgres_server, }) @@ -62,7 +63,7 @@ impl Application { ) .await, ); - let (http_server, http_port) = server::http::build_http_server::( + let (http_server, http_addr) = server::http::build_http_server::( ctx_ext, tables, &config, @@ -70,23 +71,27 @@ impl Application { )?; Ok(Self { - http_port, + http_addr, http_server, postgres_server, }) } } - pub fn http_port(&self) -> u16 { - self.http_port + pub fn http_addr(&self) -> std::net::SocketAddr { + self.http_addr } - pub fn postgres_port(&self) -> u16 { - self.postgres_server.port() + pub fn postgres_addr(&self) -> std::net::SocketAddr { + self.postgres_server.addr() } pub async fn run_until_stopped(self) -> anyhow::Result<()> { let postgres_server = self.postgres_server; + info!( + "🚀 Listening on {} for Postgres traffic...", + postgres_server.addr() + ); tokio::spawn(async move { postgres_server .run() @@ -94,6 +99,7 @@ impl Application { .expect("Failed to run postgres server"); }); + info!("🚀 Listening on {} for HTTP traffic...", self.http_addr); Ok(self.http_server.await?) } } diff --git a/roapi/tests/helpers.rs b/roapi/tests/helpers.rs index 5fb8fd9..203a0e5 100644 --- a/roapi/tests/helpers.rs +++ b/roapi/tests/helpers.rs @@ -37,10 +37,9 @@ pub async fn test_api_app( let app = Application::build(config) .await .expect("Failed to build application config"); - let port = app.http_port(); - let address = format!("http://localhost:{}", port); - (app, address) + let http_base = format!("http://{}", app.http_addr()); + (app, http_base) } pub async fn http_get(url: &str, accept: Option<&str>) -> reqwest::Response {