cleanup & update dependencies tree (#365)

update datafusion to 43, deltalake to 0.22, axum to 0.7 and arrow to 53

---------

Co-authored-by: hozan23 <hozan23@karyontech.net>
This commit is contained in:
QP Hou 2024-12-31 19:06:49 -08:00 committed by GitHub
parent 6e6b315ca3
commit a9317f73eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 854 additions and 575 deletions

View File

@ -8,7 +8,7 @@ on:
env:
# NOTE: the version is also defined in roapi_release.yml and Dockerfile
RUST_TC_VER: "1.78.0"
RUST_TC_VER: "1.83.0"
jobs:
build:

View File

@ -12,7 +12,7 @@ on:
env:
# NOTE: the version is also defined in build.yml and Dockerfile
RUST_TC_VER: "1.78.0"
RUST_TC_VER: "1.83.0"
jobs:
# skip tag version validation on non-release branch run

View File

@ -12,7 +12,7 @@ on:
env:
# NOTE: the version is also defined in build.yml and Dockerfile
RUST_TC_VER: "1.78.0"
RUST_TC_VER: "1.83.0"
jobs:
validate-release-tag:

1224
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
ARG RUST_VER=1.78.0-bookworm
ARG RUST_VER=1.83.0-bookworm
ARG RUSTFLAGS='-C target-cpu=skylake'
ARG FEATURES="database"

View File

@ -11,52 +11,56 @@ name = "columnq"
path = "src/lib.rs"
[dependencies]
# pulling arrow-schema manually to enable the serde feature.
# TODO: add serde feature in datafusion to avoid this workaround
arrow-schema = { version = "52", features = ["serde"] }
datafusion = "39"
object_store = { version = "0", features = ["aws", "gcp", "azure"] }
percent-encoding = "2.2.0"
url = "2.2"
log = "0"
url = "2.5"
log = "0.4"
regex = "1"
lazy_static = "1"
graphql-parser = "0"
sqlparser = "0.44" # version need to be in sync with convergence and datafusion
yup-oauth2 = { version = "9", default-features = false, features = [
"service_account",
] }
lazy_static = "1.5"
thiserror = "1"
snafu = "0"
snafu = "0.8"
serde_json = { version = "1" }
serde_derive = "1"
serde = "1"
uriparse = "0"
bytes = { version = "1" }
reqwest = { version = "0.11", default-features = false, features = [
percent-encoding = "2.3"
# datafusion
datafusion = { version = "43", features = ["serde"] }
# spreadsheets reader
calamine = { version = "0.23.1", features = ["dates"] }
# graphql
graphql-parser = "0.4"
# async
tokio = { version = "1", features = ["rt-multi-thread"] }
futures = "0.3"
# net
reqwest = { version = "0.12", default-features = false, features = [
"blocking",
"json",
] }
calamine = {version = "0.23.1", features = ["dates"]}
hyper-tls = { version = "0.6.0", default-features = false, optional = true }
hyper-rustls = { version = "0.27", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
futures = "0.3"
hyper-tls = { version = "0.5.0", default-features = false, optional = true }
hyper-rustls = { version = "0.25", default-features = false, optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
[dependencies.deltalake]
version = "0.18.1"
# git = "https://github.com/delta-io/delta-rs.git"
# rev = "63c14b3716428ff65e01404c6f7e62f341c98f05"
features = ["datafusion", "s3", "gcs", "azure"]
default-features = false
object_store = { version = "0.11", features = ["aws", "gcp", "azure"] }
tokio-postgres = { version = "0.7.12", optional = true }
deltalake = { version = "0.22", features = [
"datafusion",
"datafusion-ext",
"s3",
"gcs",
"azure",
] }
yup-oauth2 = { version = "10", default-features = false, features = [
"service_account",
] }
[dependencies.connectorx]
git = "https://github.com/roapi/connector-x.git"
rev = "f7ba1c38130e554cdb7dc4e04d7a166e3286d4e7"
rev = "0134b4852521d636250409823d21fb4a79a7d346"
version = "0.3.3-alpha.1"
features = ["default", "dst_arrow"]
optional = true
@ -95,8 +99,4 @@ native-tls = [
database-sqlite = ["connectorx/src_sqlite"]
database-mysql = ["connectorx/src_mysql"]
database-postgres = ["connectorx/src_postgres", "dep:tokio-postgres"]
database = [
"database-sqlite",
"database-mysql",
"database-postgres"
]
database = ["database-sqlite", "database-mysql", "database-postgres"]

View File

@ -75,7 +75,7 @@ impl ColumnQ {
);
let rn_config = RuntimeConfig::new();
let runtime_env =
RuntimeEnv::new(rn_config).expect("failed to create datafusion runtime env");
RuntimeEnv::try_new(rn_config).expect("failed to create datafusion runtime env");
let dfctx = SessionContext::new_with_config_rt(config, Arc::new(runtime_env));
let schema_map = HashMap::<String, arrow::datatypes::SchemaRef>::new();
let (refresh_tx, refresh_rx) = mpsc::channel(1024);

View File

@ -1,8 +1,5 @@
#![deny(warnings)]
#[macro_use]
extern crate lazy_static;
pub mod error;
macro_rules! partitions_from_table_source {
@ -37,11 +34,7 @@ pub mod table;
pub use crate::columnq::*;
pub use arrow_schema;
/// export datafusion and arrow so downstream won't need to declare dependencies on these libraries
pub use datafusion;
pub use datafusion::arrow;
pub use sqlparser;
pub use datafusion::{self, arrow, sql::sqlparser};
#[cfg(test)]
pub mod test_util;

View File

@ -1,5 +1,5 @@
use datafusion::arrow;
use datafusion::logical_expr::Operator;
use datafusion::logical_expr::{expr::Sort, Operator};
use datafusion::prelude::{binary_expr, Column, Expr};
use datafusion::scalar::ScalarValue;
use graphql_parser::query::{parse_query, Definition, OperationDefinition, Selection, Value};
@ -37,7 +37,7 @@ fn invalid_query(message: String) -> QueryError {
// convert order list from graphql argument to datafusion sort columns
//
// sort order matters, thus it's modeled as a list
fn to_datafusion_sort_columns(sort_columns: &[Value<String>]) -> Result<Vec<Expr>, QueryError> {
fn to_datafusion_sort_columns(sort_columns: &[Value<String>]) -> Result<Vec<Sort>, QueryError> {
sort_columns
.iter()
.map(|optval| match optval {

View File

@ -1,20 +1,22 @@
use datafusion::logical_expr::expr::Sort;
use datafusion::prelude::{Column, Expr};
use datafusion::{
logical_expr::expr::Sort,
prelude::{Column, Expr},
};
pub fn column_sort_expr_desc(column: String) -> Expr {
Expr::Sort(Sort {
expr: Box::new(Expr::Column(Column::from_name(column))),
pub fn column_sort_expr_desc(column: String) -> Sort {
Sort {
expr: Expr::Column(Column::from_name(column)),
asc: false,
nulls_first: true,
})
}
}
pub fn column_sort_expr_asc(column: impl Into<String>) -> Expr {
Expr::Sort(Sort {
expr: Box::new(Expr::Column(Column::from_name(column))),
pub fn column_sort_expr_asc(column: impl Into<String>) -> Sort {
Sort {
expr: Expr::Column(Column::from_name(column)),
asc: true,
nulls_first: true,
})
}
}
pub mod graphql;

View File

@ -6,8 +6,11 @@ use datafusion::prelude::{binary_expr, Column, Expr};
use datafusion::scalar::ScalarValue;
use regex::Regex;
use crate::error::QueryError;
use crate::query::{column_sort_expr_asc, column_sort_expr_desc};
use crate::{
error::QueryError,
query::{column_sort_expr_asc, column_sort_expr_desc},
sqlparser,
};
fn err_rest_query_value(error: sqlparser::tokenizer::TokenizerError) -> QueryError {
QueryError {
@ -58,7 +61,7 @@ pub fn apply_query(
mut df: datafusion::dataframe::DataFrame,
params: &HashMap<String, String>,
) -> Result<datafusion::dataframe::DataFrame, QueryError> {
lazy_static! {
lazy_static::lazy_static! {
static ref RE_REST_FILTER: Regex =
Regex::new(r"filter\[(?P<column>.+)\](?P<op>.+)?").unwrap();
}

View File

@ -1,11 +1,10 @@
use arrow_schema::TimeUnit;
use calamine::{open_workbook_auto, DataType as ExcelDataType, Range, Reader, Sheets};
use datafusion::arrow::array::{
ArrayRef, BooleanArray, DurationSecondArray, NullArray, PrimitiveArray, StringArray,
TimestampSecondArray,
};
use datafusion::arrow::datatypes::{
DataType, Date32Type, Date64Type, Field, Float64Type, Int64Type, Schema,
DataType, Date32Type, Date64Type, Field, Float64Type, Int64Type, Schema, TimeUnit,
};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;

View File

@ -316,7 +316,7 @@ struct GetReqContext {
}
async fn gs_get_req_contex(t: &TableSource) -> Result<GetReqContext, table::Error> {
lazy_static! {
lazy_static::lazy_static! {
static ref RE_GOOGLE_SHEET: Regex =
Regex::new(r"https://docs.google.com/spreadsheets/d/(.+)").unwrap();
}

View File

@ -22,9 +22,9 @@ snmalloc-rs = { version = "0.3", optional = true }
# dependencies related to axum
tokio = { version = "1", features = ["rt-multi-thread"] }
hyper = { version = "0", features = ["http1", "server"] }
axum = { version = "0.6", features = ["default", "http2"] }
axum = { version = "0.7", features = ["default", "http2"] }
tower = { version = "0" } # introduced only for tower::layer::util::Stack
tower-http = { version = "0.4.4", features = ["cors", "trace"] }
tower-http = { version = "0.5", features = ["cors", "trace"] }
tower-layer = "0"
tower-service = "0"
tracing = "0"
@ -45,9 +45,9 @@ thiserror = "1"
snafu = "0"
# flight-sql
arrow-flight = { version = "52", features = ["flight-sql-experimental"] }
tonic = { version = "0.11", features = ["tls"] }
prost = "0.12"
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
tonic = { version = "0", features = ["tls"] }
prost = "0"
futures = "0"
# TODO: remove once_cell dependency
once_cell = "*"
@ -57,12 +57,12 @@ uuid = "1"
[dependencies.convergence]
version = "0"
git = "https://github.com/roapi/convergence.git"
rev = "40c5fca38d83611f6c941c9ffe86b597c2e5851b"
rev = "da158f04d09460ff9416f3328df419ce01dbae77"
[dependencies.convergence-arrow]
version = "0"
git = "https://github.com/roapi/convergence.git"
rev = "40c5fca38d83611f6c941c9ffe86b597c2e5851b"
rev = "da158f04d09460ff9416f3328df419ce01dbae77"
[features]
default = ["rustls", "snmalloc"]

View File

@ -143,12 +143,7 @@ impl fmt::Display for ApiErrResp {
impl axum::response::IntoResponse for ApiErrResp {
fn into_response(self) -> axum::response::Response {
let payload = serde_json::to_string(&self).expect("failed to serialize json into string");
let body = axum::body::boxed(axum::body::Full::from(payload));
// NOTE: uncomment for axum 0.7 upgrade
// let payload = serde_json::to_string(&self).expect("failed to serialize json into string");
// let body = axum::body::Body::from(payload);
let body = axum::body::Body::from(payload);
Response::builder().status(self.code).body(body).unwrap()
}
}

View File

@ -24,7 +24,7 @@ use arrow_flight::{
};
use async_trait::async_trait;
use base64::prelude::*;
use columnq::arrow_schema::Schema;
use columnq::datafusion::arrow::datatypes::Schema;
use columnq::datafusion::arrow::ipc::writer::IpcWriteOptions;
use columnq::datafusion::arrow::record_batch::RecordBatch;
use columnq::datafusion::logical_expr::LogicalPlan;

View File

@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::net::TcpListener;
use std::sync::Arc;
use axum::extract::Extension;
@ -20,19 +19,14 @@ pub enum Error {
BindTcp { source: std::io::Error },
}
// NOTE: uncomment for axum 0.7 upgrade
// pub type HttpApiServe = axum::serve::Serve<axum::Router, axum::Router>;
pub type HttpApiServer =
axum::Server<hyper::server::conn::AddrIncoming, axum::routing::IntoMakeService<axum::Router>>;
pub type HttpApiServe = axum::serve::Serve<axum::Router, axum::Router>;
pub async fn build_http_server<H: RoapiContext>(
ctx_ext: Arc<H>,
tables: Arc<Mutex<HashMap<String, TableSource>>>,
config: &Config,
default_host: String,
// NOTE: uncomment for axum 0.7 upgrade
// ) -> Result<(HttpApiServe, std::net::SocketAddr), Error> {
) -> Result<(HttpApiServer, std::net::SocketAddr), Error> {
) -> Result<(HttpApiServe, std::net::SocketAddr), Error> {
let default_http_port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
let default_http_addr = [default_host, default_http_port].join(":");
let http_addr = config
@ -45,9 +39,7 @@ pub async fn build_http_server<H: RoapiContext>(
let mut app = routes.layer(Extension(ctx_ext));
let cors = tower_http::cors::CorsLayer::new()
// NOTE: uncomment for axum 0.7 upgrade
// .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS])
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_origin(tower_http::cors::Any)
.allow_credentials(false);
@ -58,19 +50,13 @@ pub async fn build_http_server<H: RoapiContext>(
app = app.layer(layers::HttpLoggerLayer::new());
}
// NOTE: uncomment for axum 0.7 upgrade
// let listener = tokio::net::TcpListener::bind(http_addr)
// .await
// .context(BindTcpSnafu)?;
let listener = TcpListener::bind(http_addr).context(BindTcpSnafu)?;
let listener = tokio::net::TcpListener::bind(http_addr)
.await
.context(BindTcpSnafu)?;
let addr = listener
.local_addr()
.expect("Failed to get address from listener");
// NOTE: uncomment for axum 0.7 upgrade
// let serve = axum::serve(listener, app);
// Ok((serve, addr))
let http_server = axum::Server::from_tcp(listener).unwrap();
let http_server = http_server.serve(app.into_make_service());
Ok((http_server, addr))
let serve = axum::serve(listener, app);
Ok((serve, addr))
}

View File

@ -49,7 +49,7 @@ impl TableReloader {
pub struct Application {
http_addr: std::net::SocketAddr,
http_server: server::http::HttpApiServer,
http_server: server::http::HttpApiServe,
table_reloader: Option<TableReloader>,
postgres_server: Box<dyn server::RunnableServer>,
flight_sql_server: Box<dyn server::RunnableServer>,

View File

@ -80,7 +80,7 @@ async fn test_sql_invalid_post() {
serde_json::json!({
"code": 400,
"error": "plan_sql",
"message": "Failed to plan execution from SQL query: SQL error: ParserError(\"Expected identifier, found: EOF\")"
"message": "Failed to plan execution from SQL query: SQL error: ParserError(\"Expected: identifier, found: EOF\")"
})
);
}

View File

@ -5,7 +5,7 @@ use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::FlightInfo;
use arrow_ipc::convert::try_schema_from_ipc_buffer;
use columnq::arrow::datatypes::{DataType, Field};
use columnq::arrow_schema::ArrowError;
use columnq::arrow::error::ArrowError;
use columnq::datafusion::arrow;
use columnq::datafusion::arrow::record_batch::RecordBatch;
use columnq::table::TableSource;
@ -86,7 +86,7 @@ async fn test_flight_sql_spacex_aggregate() {
let res = pretty_format_batches(batches.as_slice()).unwrap();
let expected = r#"
+----------+--------------------------+
| COUNT(*) | launch_cnt |
| count(*) | launch_cnt |
+----------+--------------------------+
| 5 | 5e9d0d95eda69955f709d1eb |
| 122 | 5e9d0d95eda69973a809d1ec |

View File

@ -1,14 +1,13 @@
mod helpers;
use std::collections::HashMap;
use std::net::TcpListener;
use std::sync::Arc;
use axum::http::HeaderMap;
use axum::{extract::State, response::IntoResponse, routing::get};
async fn http_server() -> (
axum::Server<hyper::server::conn::AddrIncoming, axum::routing::IntoMakeService<axum::Router>>,
axum::serve::Serve<axum::Router, axum::Router>,
std::net::SocketAddr,
) {
async fn serve_json(
@ -29,15 +28,13 @@ async fn http_server() -> (
.route("/table.json", get(serve_json))
.with_state(state);
let listener = TcpListener::bind("localhost:0").unwrap();
let listener = tokio::net::TcpListener::bind("localhost:0").await.unwrap();
let addr = listener
.local_addr()
.expect("Failed to get address from listener");
let http_server = axum::Server::from_tcp(listener).unwrap();
let http_server = http_server.serve(app.into_make_service());
(http_server, addr)
let serve = axum::serve(listener, app);
(serve, addr)
}
#[tokio::test]