mirror of
https://github.com/roapi/roapi.git
synced 2026-06-05 21:04:02 +08:00
upgrade dependencies (#425)
This commit is contained in:
parent
5e5d55ee5f
commit
c5efa9716d
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@ -8,7 +8,7 @@ on:
|
||||
|
||||
env:
|
||||
# NOTE: the version is also defined in roapi_release.yml and Dockerfile
|
||||
RUST_TC_VER: "1.86.0"
|
||||
RUST_TC_VER: "1.94.0"
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
2
.github/workflows/columnq_cli_release.yml
vendored
2
.github/workflows/columnq_cli_release.yml
vendored
@ -12,7 +12,7 @@ on:
|
||||
|
||||
env:
|
||||
# NOTE: the version is also defined in build.yml and Dockerfile
|
||||
RUST_TC_VER: "1.86.0"
|
||||
RUST_TC_VER: "1.94.0"
|
||||
|
||||
jobs:
|
||||
# skip tag version validation on non-release branch run
|
||||
|
||||
2
.github/workflows/roapi_release.yml
vendored
2
.github/workflows/roapi_release.yml
vendored
@ -12,7 +12,7 @@ on:
|
||||
|
||||
env:
|
||||
# NOTE: the version is also defined in build.yml and Dockerfile
|
||||
RUST_TC_VER: "1.86.0"
|
||||
RUST_TC_VER: "1.94.0"
|
||||
|
||||
jobs:
|
||||
validate-release-tag:
|
||||
|
||||
4847
Cargo.lock
generated
4847
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,4 +1,4 @@
|
||||
ARG RUST_VER=1.86.0-bookworm
|
||||
ARG RUST_VER=1.94.0-bookworm
|
||||
ARG FEATURES="database,ui"
|
||||
|
||||
# Step 0: Install cargo-chef
|
||||
|
||||
@ -19,11 +19,11 @@ snmalloc-rs = { version = "0.3", optional = true }
|
||||
serde_json = "*"
|
||||
log = "0"
|
||||
tokio = "1"
|
||||
rustyline = { version = "9" }
|
||||
rustyline = { version = "17" }
|
||||
env_logger = { version = "0" }
|
||||
anyhow = { version = "1" }
|
||||
clap = { version = "4", features = ["color"] }
|
||||
dirs = { version = "4" }
|
||||
dirs = { version = "6" }
|
||||
|
||||
[features]
|
||||
default = ["rustls", "snmalloc"]
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use anyhow::{anyhow, Context};
|
||||
use log::debug;
|
||||
use rustyline::error::ReadlineError;
|
||||
use rustyline::Editor;
|
||||
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@ -46,7 +46,7 @@ fn table_arg() -> clap::Arg {
|
||||
async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
|
||||
let rl_history = history_path()?;
|
||||
|
||||
let mut readline = Editor::<()>::new();
|
||||
let mut readline = rustyline::DefaultEditor::new()?;
|
||||
if let Err(e) = readline.load_history(&rl_history) {
|
||||
debug!("no query history loaded: {e:?}");
|
||||
}
|
||||
@ -54,7 +54,7 @@ async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
|
||||
loop {
|
||||
match readline.readline("columnq(sql)> ") {
|
||||
Ok(line) => {
|
||||
readline.add_history_entry(line.as_str());
|
||||
let _ = readline.add_history_entry(line.as_str());
|
||||
match line.as_ref() {
|
||||
"exit" | "quit" | "q" => {
|
||||
println!("Good bye!");
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "columnq"
|
||||
version = "0.9.4"
|
||||
version = "0.10.0"
|
||||
homepage = "https://github.com/roapi/roapi"
|
||||
license = "MIT"
|
||||
authors = ["QP Hou <dave2008713@gmail.com>"]
|
||||
@ -24,50 +24,52 @@ bytes = { version = "1" }
|
||||
percent-encoding = "2.3"
|
||||
|
||||
# datafusion
|
||||
datafusion = { version = "47", features = ["serde"] }
|
||||
arrow = { version = "55", features = ["prettyprint"] }
|
||||
datafusion = { version = "52", features = ["serde"] }
|
||||
arrow = { version = "57", features = ["prettyprint", "ffi"] }
|
||||
|
||||
# spreadsheets reader
|
||||
calamine = { version = "0.23.1", features = ["dates"] }
|
||||
calamine = { version = "0.34", features = ["chrono"] }
|
||||
|
||||
# graphql
|
||||
graphql-parser = "0.4"
|
||||
|
||||
# async
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
chrono = "0.4"
|
||||
futures = "0.3"
|
||||
|
||||
# net
|
||||
reqwest = { version = "0", default-features = false, features = [
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
"blocking",
|
||||
"json",
|
||||
] }
|
||||
hyper-tls = { version = "0", default-features = false, optional = true }
|
||||
hyper-rustls = { version = "0", default-features = false, optional = true }
|
||||
hyper-tls = { version = "0.6", default-features = false, optional = true }
|
||||
hyper-rustls = { version = "0.27", default-features = false, optional = true }
|
||||
|
||||
object_store = { version = "0.12", features = ["http", "aws", "gcp", "azure"] }
|
||||
tokio-postgres = { version = "0.7.12", optional = true }
|
||||
deltalake = { version = "0.26", features = [
|
||||
deltalake = { version = "0.31", features = [
|
||||
"datafusion",
|
||||
"datafusion-ext",
|
||||
"s3",
|
||||
"gcs",
|
||||
"azure",
|
||||
] }
|
||||
yup-oauth2 = { version = "10", default-features = false, features = [
|
||||
yup-oauth2 = { version = "12", default-features = false, features = [
|
||||
"service_account",
|
||||
"aws-lc-rs",
|
||||
] }
|
||||
|
||||
[dependencies.connectorx]
|
||||
git = "https://github.com/roapi/connector-x.git"
|
||||
rev = "77e769ec5d8654a5f6912573a70d56ac8b7f7a50"
|
||||
rev = "7d3e0a90c83934754a81b4ae9cc5ae4b0699d39c"
|
||||
version = "0.3.3-alpha.1"
|
||||
features = ["default", "dst_arrow"]
|
||||
optional = true
|
||||
|
||||
[dev-dependencies]
|
||||
serde_yaml = "0.9"
|
||||
toml = "0.7"
|
||||
toml = "1"
|
||||
tempfile = "3.3.0"
|
||||
pretty_assertions = "*"
|
||||
dotenvy = "*"
|
||||
|
||||
@ -79,19 +79,21 @@ fn to_datafusion_sort_columns(sort_columns: &[Value<String>]) -> Result<Vec<Sort
|
||||
|
||||
fn operand_to_datafusion_expr(operand: &Value<String>) -> Result<Expr, QueryError> {
|
||||
match operand {
|
||||
Value::Boolean(b) => Ok(Expr::Literal(ScalarValue::Boolean(Some(*b)))),
|
||||
Value::String(s) => Ok(Expr::Literal(ScalarValue::Utf8(Some(s.to_string())))),
|
||||
Value::Boolean(b) => Ok(Expr::Literal(ScalarValue::Boolean(Some(*b)), None)),
|
||||
Value::String(s) => Ok(Expr::Literal(ScalarValue::Utf8(Some(s.to_string())), None)),
|
||||
// GraphQL only supports int32 scalar input: http://spec.graphql.org/June2018/#sec-Int, but
|
||||
// graphql crate only supports in64.
|
||||
// TODO: set literal value type based on schema?
|
||||
Value::Int(n) => Ok(Expr::Literal(ScalarValue::Int64(Some(
|
||||
n.as_i64().ok_or_else(|| {
|
||||
invalid_query(format!(
|
||||
"invalid integer number in filter predicate: {operand}"
|
||||
))
|
||||
})?,
|
||||
)))),
|
||||
Value::Float(f) => Ok(Expr::Literal(ScalarValue::Float64(Some(f.to_owned())))),
|
||||
Value::Int(n) => Ok(Expr::Literal(
|
||||
ScalarValue::Int64(Some(n.as_i64().ok_or_else(|| {
|
||||
invalid_query(format!("invalid integer number in filter: {n:?}"))
|
||||
})?)),
|
||||
None,
|
||||
)),
|
||||
Value::Float(f) => Ok(Expr::Literal(
|
||||
ScalarValue::Float64(Some(f.to_owned())),
|
||||
None,
|
||||
)),
|
||||
other => Err(invalid_query(format!(
|
||||
"invalid operand in filter predicate: {other}",
|
||||
))),
|
||||
|
||||
@ -29,13 +29,13 @@ fn rest_query_value_to_expr(v: &str) -> Result<Expr, QueryError> {
|
||||
match t {
|
||||
// TODO: support column expr instead of just literal
|
||||
sqlparser::tokenizer::Token::SingleQuotedString(s) => {
|
||||
Ok(Expr::Literal(ScalarValue::Utf8(Some(s.to_string()))))
|
||||
Ok(Expr::Literal(ScalarValue::Utf8(Some(s.to_string())), None))
|
||||
}
|
||||
sqlparser::tokenizer::Token::Number(s, _) => {
|
||||
if let Ok(n) = s.parse() {
|
||||
Ok(Expr::Literal(ScalarValue::Int64(Some(n))))
|
||||
Ok(Expr::Literal(ScalarValue::Int64(Some(n)), None))
|
||||
} else if let Ok(n) = s.parse() {
|
||||
Ok(Expr::Literal(ScalarValue::Float64(Some(n))))
|
||||
Ok(Expr::Literal(ScalarValue::Float64(Some(n)), None))
|
||||
} else {
|
||||
Err(QueryError {
|
||||
error: "rest_query_value".to_string(),
|
||||
@ -251,7 +251,10 @@ mod tests {
|
||||
.table("ubuntu_ami")
|
||||
.await
|
||||
.unwrap()
|
||||
.filter(col("arch").eq(Expr::Literal(ScalarValue::Utf8(Some("amd64".to_string())))))
|
||||
.filter(col("arch").eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("amd64".to_string())),
|
||||
None,
|
||||
)))
|
||||
.unwrap()
|
||||
.select(vec![col("ami_id"), col("version")])
|
||||
.unwrap()
|
||||
|
||||
@ -131,7 +131,7 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37 * 3));
|
||||
}
|
||||
@ -149,7 +149,7 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37));
|
||||
}
|
||||
|
||||
@ -131,7 +131,7 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37 * 3));
|
||||
}
|
||||
@ -149,7 +149,7 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37));
|
||||
}
|
||||
|
||||
@ -212,7 +212,7 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37 * 3));
|
||||
}
|
||||
@ -238,7 +238,7 @@ c1,c2,c3
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(3));
|
||||
}
|
||||
|
||||
@ -59,16 +59,16 @@ mod imp {
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDatabaseSnafu)?;
|
||||
|
||||
datafusion::datasource::MemTable::try_new(
|
||||
destination.arrow_schema(),
|
||||
vec![destination
|
||||
.arrow()
|
||||
.context(ToArrowSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDatabaseSnafu)?],
|
||||
)
|
||||
.map_err(Box::new)
|
||||
.context(table::CreateMemTableSnafu)
|
||||
let schema = destination.arrow_schema();
|
||||
let batches = destination
|
||||
.arrow()
|
||||
.context(ToArrowSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDatabaseSnafu)?;
|
||||
|
||||
datafusion::datasource::MemTable::try_new(schema, vec![batches])
|
||||
.map_err(Box::new)
|
||||
.context(table::CreateMemTableSnafu)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,6 +7,8 @@ use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion::parquet::arrow::arrow_reader::ArrowReaderOptions;
|
||||
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
||||
use deltalake::DeltaTableBuilder;
|
||||
use object_store::DynObjectStore;
|
||||
|
||||
use crate::io::{self, BlobStoreType};
|
||||
use crate::table::{self, LoadedTable, TableLoadOption, TableOptionDelta, TableSource};
|
||||
@ -51,19 +53,52 @@ pub enum Error {
|
||||
UpdateTable {
|
||||
source: deltalake::errors::DeltaTableError,
|
||||
},
|
||||
#[snafu(display("Failed to create memory table: {source}"))]
|
||||
CreateMemTable {
|
||||
source: datafusion::error::DataFusionError,
|
||||
},
|
||||
}
|
||||
|
||||
async fn update_table(
|
||||
mut t: deltalake::DeltaTable,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
t
|
||||
// TODO: find a way to not do a full table update?
|
||||
t = t
|
||||
.update()
|
||||
.await
|
||||
.map(|(t, _)| t)
|
||||
.context(UpdateTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?;
|
||||
Ok(Arc::new(t) as Arc<dyn TableProvider>)
|
||||
t.table_provider()
|
||||
.await
|
||||
.map_err(|e| table::Error::LoadDelta {
|
||||
source: Box::new(Error::UpdateTable {
|
||||
source: deltalake::errors::DeltaTableError::Generic(e.to_string()),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolve the table URL and look up the object store that was already
|
||||
/// registered in DataFusion's runtime env. Returns `None` for local
|
||||
/// filesystem paths (no store registration needed).
|
||||
fn get_registered_store(
|
||||
uri_str: &str,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
) -> Result<Option<(url::Url, Arc<DynObjectStore>)>, table::Error> {
|
||||
if !uri_str.contains("://") {
|
||||
// local filesystem – no registered store
|
||||
return Ok(None);
|
||||
}
|
||||
let url = url::Url::parse(uri_str).map_err(|_| table::Error::LoadDelta {
|
||||
source: Box::new(Error::OpenTable {
|
||||
source: deltalake::errors::DeltaTableError::InvalidTableLocation(uri_str.to_string()),
|
||||
}),
|
||||
})?;
|
||||
match dfctx.runtime_env().object_store_registry.get_store(&url) {
|
||||
Ok(store) => Ok(Some((url, store))),
|
||||
// If not registered yet, fall back to delta-rs default resolution
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(
|
||||
@ -78,16 +113,42 @@ pub async fn to_loaded_table(
|
||||
let TableOptionDelta { use_memory_table } = opt.as_delta()?;
|
||||
|
||||
let uri_str = t.get_uri_str();
|
||||
let delta_table = deltalake::DeltaTableBuilder::from_valid_uri(uri_str)
|
||||
.context(OpenTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?
|
||||
.with_allow_http(true)
|
||||
.load()
|
||||
.await
|
||||
.context(LoadTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?;
|
||||
|
||||
// Build the URL (handle both remote URIs and bare filesystem paths).
|
||||
let url = if uri_str.contains("://") {
|
||||
url::Url::parse(uri_str).ok()
|
||||
} else {
|
||||
let abs_path =
|
||||
std::fs::canonicalize(uri_str).unwrap_or_else(|_| std::path::PathBuf::from(uri_str));
|
||||
url::Url::from_file_path(abs_path).ok()
|
||||
}
|
||||
.ok_or_else(|| table::Error::LoadDelta {
|
||||
source: Box::new(Error::OpenTable {
|
||||
source: deltalake::errors::DeltaTableError::InvalidTableLocation(uri_str.to_string()),
|
||||
}),
|
||||
})?;
|
||||
|
||||
// Reuse the object store that is already registered in DataFusion's
|
||||
// runtime env so that settings like `allow_http = true` apply uniformly
|
||||
// to both DataFusion scans and delta-rs metadata reads.
|
||||
let delta_table = match get_registered_store(uri_str, dfctx)? {
|
||||
Some((store_url, store)) => DeltaTableBuilder::from_url(url.clone())
|
||||
.context(OpenTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?
|
||||
.with_storage_backend(store, store_url)
|
||||
.load()
|
||||
.await
|
||||
.context(OpenTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?,
|
||||
None => deltalake::open_table(url.clone())
|
||||
.await
|
||||
.context(OpenTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?,
|
||||
};
|
||||
|
||||
let parsed_uri = t.parsed_uri()?;
|
||||
let url_scheme = parsed_uri.scheme();
|
||||
let blob_type = BlobStoreType::try_from(url_scheme).context(table::IoSnafu)?;
|
||||
@ -106,7 +167,7 @@ pub async fn to_loaded_table(
|
||||
LoadedTable::new_from_df_table_cb(to_datafusion_table).await
|
||||
} else {
|
||||
let curr_table = delta_table.clone();
|
||||
let df_table = cast_datafusion_table(delta_table, blob_type)?;
|
||||
let df_table = cast_datafusion_table(delta_table, blob_type).await?;
|
||||
Ok(LoadedTable::new(
|
||||
df_table,
|
||||
Box::new(move || {
|
||||
@ -117,7 +178,7 @@ pub async fn to_loaded_table(
|
||||
}
|
||||
}
|
||||
|
||||
fn cast_datafusion_table(
|
||||
async fn cast_datafusion_table(
|
||||
delta_table: deltalake::DeltaTable,
|
||||
blob_type: io::BlobStoreType,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
@ -125,9 +186,18 @@ fn cast_datafusion_table(
|
||||
io::BlobStoreType::Azure
|
||||
| io::BlobStoreType::S3
|
||||
| io::BlobStoreType::GCS
|
||||
| io::BlobStoreType::FileSystem => Ok(Arc::new(delta_table)),
|
||||
| io::BlobStoreType::FileSystem => {
|
||||
Ok(delta_table
|
||||
.table_provider()
|
||||
.await
|
||||
.map_err(|e| table::Error::LoadDelta {
|
||||
source: Box::new(Error::LoadTable {
|
||||
source: deltalake::errors::DeltaTableError::Generic(e.to_string()),
|
||||
}),
|
||||
})?)
|
||||
}
|
||||
_ => Err(Box::new(Error::InvalidUri {
|
||||
uri: delta_table.table_uri().to_string(),
|
||||
uri: delta_table.table_url().to_string(),
|
||||
}))
|
||||
.context(table::LoadDeltaSnafu),
|
||||
}
|
||||
@ -170,21 +240,15 @@ pub async fn to_mem_table(
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
let paths = delta_table
|
||||
.get_file_uris()
|
||||
.context(LoadTableSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?
|
||||
.map_err(|e| table::Error::LoadDelta {
|
||||
source: Box::new(Error::LoadTable { source: e }),
|
||||
})?
|
||||
.collect::<Vec<String>>();
|
||||
if paths.is_empty() {
|
||||
return Err(Box::new(Error::EmptyTable {})).context(table::LoadDeltaSnafu);
|
||||
}
|
||||
|
||||
let delta_schema = delta_table
|
||||
.get_schema()
|
||||
.context(GetSchemaSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?;
|
||||
|
||||
let path_iter = paths.iter().map(|s| s.as_ref());
|
||||
let path_iter = paths.iter().map(|s: &String| s.as_str());
|
||||
|
||||
let partitions: Vec<Vec<RecordBatch>> = match blob_type {
|
||||
io::BlobStoreType::FileSystem => io::fs::partitions_from_iterator(
|
||||
@ -207,26 +271,29 @@ pub async fn to_mem_table(
|
||||
}
|
||||
_ => {
|
||||
return Err(Box::new(Error::InvalidUri {
|
||||
uri: delta_table.table_uri().to_string(),
|
||||
uri: delta_table.table_url().to_string(),
|
||||
}))
|
||||
.context(table::LoadDeltaSnafu);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Arc::new(
|
||||
datafusion::datasource::MemTable::try_new(
|
||||
Arc::new(
|
||||
delta_schema
|
||||
.try_into()
|
||||
.context(ConvertSchemaSnafu)
|
||||
.map_err(Box::new)
|
||||
.context(table::LoadDeltaSnafu)?,
|
||||
),
|
||||
partitions,
|
||||
)
|
||||
.map_err(Box::new)
|
||||
.context(table::CreateMemTableSnafu)?,
|
||||
))
|
||||
let mem_table = datafusion::datasource::MemTable::try_new(
|
||||
delta_table
|
||||
.table_provider()
|
||||
.await
|
||||
.map_err(|e| table::Error::LoadDelta {
|
||||
source: Box::new(Error::GetSchema {
|
||||
source: deltalake::errors::DeltaTableError::Generic(e.to_string()),
|
||||
}),
|
||||
})?
|
||||
.schema(),
|
||||
partitions,
|
||||
)
|
||||
.map_err(|e| table::Error::LoadDelta {
|
||||
source: Box::new(Error::CreateMemTable { source: e }),
|
||||
})?;
|
||||
|
||||
Ok(Arc::new(mem_table))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -238,8 +305,6 @@ mod tests {
|
||||
use datafusion::physical_plan::Statistics;
|
||||
use datafusion::prelude::SessionContext;
|
||||
|
||||
use deltalake::DeltaTable;
|
||||
|
||||
use crate::test_util::test_data_path;
|
||||
|
||||
#[tokio::test]
|
||||
@ -261,7 +326,7 @@ mod tests {
|
||||
t.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
@ -273,7 +338,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn load_delta_as_delta_source() {
|
||||
let ctx = SessionContext::new();
|
||||
let t = to_loaded_table(
|
||||
let _t = to_loaded_table(
|
||||
&TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option(
|
||||
TableLoadOption::delta(TableOptionDelta {
|
||||
use_memory_table: false,
|
||||
@ -284,13 +349,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap()
|
||||
.table;
|
||||
|
||||
match t.as_any().downcast_ref::<DeltaTable>() {
|
||||
Some(delta_table) => {
|
||||
assert_eq!(delta_table.version(), 0);
|
||||
}
|
||||
None => panic!("must be of type deltalake::DeltaTable"),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_statistics(stats: Statistics) {
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
use calamine::{open_workbook_auto, DataType as ExcelDataType, Range, Reader, Sheets};
|
||||
use calamine::{
|
||||
open_workbook_auto, Data as ExcelData, DataType as CalamineDataType, Range, Reader, Sheets,
|
||||
};
|
||||
use datafusion::arrow::array::{
|
||||
ArrayRef, BooleanArray, DurationSecondArray, NullArray, PrimitiveArray, StringArray,
|
||||
TimestampSecondArray,
|
||||
@ -32,7 +34,7 @@ pub enum Error {
|
||||
}
|
||||
|
||||
struct ExcelSubrange<'a> {
|
||||
rows: calamine::Rows<'a, ExcelDataType>,
|
||||
rows: calamine::Rows<'a, ExcelData>,
|
||||
columns_range_start: usize,
|
||||
columns_range_end: usize,
|
||||
total_rows: usize,
|
||||
@ -41,7 +43,7 @@ struct ExcelSubrange<'a> {
|
||||
|
||||
impl<'a> ExcelSubrange<'a> {
|
||||
fn new(
|
||||
range: &'a Range<ExcelDataType>,
|
||||
range: &'a Range<ExcelData>,
|
||||
rows_range_start: Option<usize>,
|
||||
rows_range_end: Option<usize>,
|
||||
columns_range_start: Option<usize>,
|
||||
@ -73,7 +75,7 @@ impl<'a> ExcelSubrange<'a> {
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ExcelSubrange<'a> {
|
||||
type Item = &'a [ExcelDataType];
|
||||
type Item = &'a [ExcelData];
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.current_row_id < self.total_rows {
|
||||
@ -91,20 +93,38 @@ impl<'a> Iterator for ExcelSubrange<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn infer_value_type(v: &ExcelDataType) -> Result<DataType, Error> {
|
||||
fn infer_value_type(v: &ExcelData) -> Result<DataType, Error> {
|
||||
match v {
|
||||
ExcelDataType::Int(_) => Ok(DataType::Int64),
|
||||
ExcelDataType::Float(_) => Ok(DataType::Float64),
|
||||
ExcelDataType::String(_) => Ok(DataType::Utf8),
|
||||
ExcelDataType::Bool(_) => Ok(DataType::Boolean),
|
||||
ExcelDataType::DateTime(_) | ExcelDataType::DateTimeIso(_) => {
|
||||
ExcelData::Int(_) => Ok(DataType::Int64),
|
||||
ExcelData::Float(_) => Ok(DataType::Float64),
|
||||
ExcelData::String(_) => Ok(DataType::Utf8),
|
||||
ExcelData::Bool(_) => Ok(DataType::Boolean),
|
||||
ExcelData::DateTime(_) | ExcelData::DateTimeIso(_) => {
|
||||
Ok(DataType::Timestamp(TimeUnit::Second, None))
|
||||
}
|
||||
ExcelDataType::Duration(_) | ExcelDataType::DurationIso(_) => {
|
||||
Ok(DataType::Duration(TimeUnit::Second))
|
||||
ExcelData::Error(e) => Err(Error::Load {
|
||||
msg: format!("{:?}", e),
|
||||
}),
|
||||
ExcelData::Empty => Ok(DataType::Null),
|
||||
_ => {
|
||||
if v.as_f64().is_some() {
|
||||
Ok(DataType::Float64)
|
||||
} else if v.as_i64().is_some() {
|
||||
Ok(DataType::Int64)
|
||||
} else if v.as_string().is_some() {
|
||||
Ok(DataType::Utf8)
|
||||
} else if v.get_bool().is_some() {
|
||||
Ok(DataType::Boolean)
|
||||
} else if v.as_datetime().is_some() {
|
||||
Ok(DataType::Timestamp(TimeUnit::Second, None))
|
||||
} else if v.as_duration().is_some() {
|
||||
Ok(DataType::Duration(TimeUnit::Second))
|
||||
} else {
|
||||
Err(Error::Load {
|
||||
msg: format!("unsupported excel data type: {:?}", v),
|
||||
})
|
||||
}
|
||||
}
|
||||
ExcelDataType::Error(e) => Err(Error::Load { msg: e.to_string() }),
|
||||
ExcelDataType::Empty => Ok(DataType::Null),
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,13 +165,13 @@ fn infer_schema_from_data(mut range: ExcelSubrange) -> Result<Schema, Error> {
|
||||
*ct = DataType::Utf8;
|
||||
}
|
||||
})
|
||||
.or_insert(col_type);
|
||||
.or_insert_with(|| col_type.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let fields: Vec<Field> = col_names
|
||||
.iter()
|
||||
.map(|col_name| {
|
||||
.map(|col_name: &&str| {
|
||||
let dt = col_types.get(col_name).unwrap_or(&DataType::Utf8).clone();
|
||||
Field::new(col_name.replace(' ', "_"), dt, true)
|
||||
})
|
||||
@ -189,7 +209,7 @@ fn infer_schema_from_config(table_schema: &TableSchema) -> Result<Schema, Error>
|
||||
}
|
||||
}
|
||||
|
||||
fn empty_or_panic<T>(v: &ExcelDataType, field_name: &String) -> Option<T> {
|
||||
fn empty_or_panic<T>(v: &ExcelData, field_name: &String) -> Option<T> {
|
||||
if v.is_empty() {
|
||||
None
|
||||
} else {
|
||||
@ -198,7 +218,7 @@ fn empty_or_panic<T>(v: &ExcelDataType, field_name: &String) -> Option<T> {
|
||||
}
|
||||
|
||||
fn infer_schema(
|
||||
r: &Range<ExcelDataType>,
|
||||
r: &Range<ExcelData>,
|
||||
option: &TableOptionExcel,
|
||||
schema: &Option<TableSchema>,
|
||||
) -> Result<Schema, Error> {
|
||||
@ -230,7 +250,7 @@ fn infer_schema(
|
||||
}
|
||||
|
||||
fn excel_range_to_record_batch(
|
||||
r: Range<ExcelDataType>,
|
||||
r: Range<ExcelData>,
|
||||
option: &TableOptionExcel,
|
||||
schema: Schema,
|
||||
) -> Result<RecordBatch, Error> {
|
||||
@ -282,7 +302,7 @@ fn excel_range_to_record_batch(
|
||||
rows.map(|r| {
|
||||
r.get(i).and_then(|v| {
|
||||
v.as_duration()
|
||||
.map(|v| v.num_seconds())
|
||||
.map(|v: chrono::Duration| v.num_seconds())
|
||||
.or_else(|| empty_or_panic(v, field_name))
|
||||
})
|
||||
})
|
||||
@ -292,18 +312,19 @@ fn excel_range_to_record_batch(
|
||||
DataType::Utf8 => Arc::new(
|
||||
rows.map(|r| {
|
||||
r.get(i).and_then(|v| match v {
|
||||
ExcelDataType::Bool(x) => Some(x.to_string()),
|
||||
ExcelDataType::Float(_)
|
||||
| ExcelDataType::Int(_)
|
||||
| ExcelDataType::String(_) => v.as_string(),
|
||||
ExcelDataType::DateTime(_) | ExcelDataType::DateTimeIso(_) => {
|
||||
v.as_datetime().map(|x| x.to_string())
|
||||
ExcelData::Bool(x) => Some(x.to_string()),
|
||||
ExcelData::Float(_) | ExcelData::Int(_) | ExcelData::String(_) => {
|
||||
v.as_string()
|
||||
}
|
||||
ExcelDataType::Duration(_) | ExcelDataType::DurationIso(_) => {
|
||||
v.as_duration().map(|x| x.to_string())
|
||||
ExcelData::DateTime(_) | ExcelData::DateTimeIso(_) => v
|
||||
.as_datetime()
|
||||
.map(|x: chrono::NaiveDateTime| x.to_string()),
|
||||
_ if v.as_duration().is_some() => {
|
||||
v.as_duration().map(|x: chrono::Duration| x.to_string())
|
||||
}
|
||||
ExcelDataType::Empty => None,
|
||||
ExcelDataType::Error(e) => Some(e.to_string()),
|
||||
ExcelData::Empty => None,
|
||||
ExcelData::Error(e) => Some(format!("{:?}", e)),
|
||||
_ => v.as_string(),
|
||||
})
|
||||
})
|
||||
.collect::<StringArray>(),
|
||||
@ -312,7 +333,7 @@ fn excel_range_to_record_batch(
|
||||
rows.map(|r| {
|
||||
r.get(i).and_then(|v| {
|
||||
v.as_datetime()
|
||||
.map(|v| v.and_utc().timestamp())
|
||||
.map(|v: chrono::NaiveDateTime| v.and_utc().timestamp())
|
||||
.or_else(|| empty_or_panic(v, field_name))
|
||||
})
|
||||
})
|
||||
@ -407,23 +428,23 @@ mod tests {
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion::prelude::SessionContext;
|
||||
|
||||
use calamine::{Cell, DataType as ExcelDataType};
|
||||
use calamine::{Cell, Data as ExcelData};
|
||||
|
||||
#[test]
|
||||
fn excel_subrange_iteration() {
|
||||
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelDataType::Int(0)),
|
||||
Cell::new((0, 1), ExcelDataType::Bool(true)),
|
||||
Cell::new((0, 2), ExcelDataType::Float(0.333)),
|
||||
Cell::new((1, 0), ExcelDataType::Int(1)),
|
||||
Cell::new((1, 1), ExcelDataType::Bool(false)),
|
||||
Cell::new((1, 2), ExcelDataType::Float(1.333)),
|
||||
Cell::new((2, 0), ExcelDataType::Int(2)),
|
||||
Cell::new((2, 1), ExcelDataType::Empty),
|
||||
Cell::new((2, 2), ExcelDataType::Float(2.333)),
|
||||
Cell::new((3, 0), ExcelDataType::Int(3)),
|
||||
Cell::new((3, 1), ExcelDataType::Bool(true)),
|
||||
Cell::new((3, 2), ExcelDataType::Float(3.333)),
|
||||
let range = calamine::Range::<ExcelData>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelData::Int(0)),
|
||||
Cell::new((0, 1), ExcelData::Bool(true)),
|
||||
Cell::new((0, 2), ExcelData::Float(0.333)),
|
||||
Cell::new((1, 0), ExcelData::Int(1)),
|
||||
Cell::new((1, 1), ExcelData::Bool(false)),
|
||||
Cell::new((1, 2), ExcelData::Float(1.333)),
|
||||
Cell::new((2, 0), ExcelData::Int(2)),
|
||||
Cell::new((2, 1), ExcelData::Empty),
|
||||
Cell::new((2, 2), ExcelData::Float(2.333)),
|
||||
Cell::new((3, 0), ExcelData::Int(3)),
|
||||
Cell::new((3, 1), ExcelData::Bool(true)),
|
||||
Cell::new((3, 2), ExcelData::Float(3.333)),
|
||||
]);
|
||||
let mut subrange = ExcelSubrange::new(&range, None, None, None, None);
|
||||
assert_eq!(subrange.size(), 4);
|
||||
@ -431,9 +452,9 @@ mod tests {
|
||||
subrange.next(),
|
||||
Some(
|
||||
&vec![
|
||||
ExcelDataType::Int(0),
|
||||
ExcelDataType::Bool(true),
|
||||
ExcelDataType::Float(0.333)
|
||||
ExcelData::Int(0),
|
||||
ExcelData::Bool(true),
|
||||
ExcelData::Float(0.333)
|
||||
][..]
|
||||
)
|
||||
);
|
||||
@ -441,29 +462,23 @@ mod tests {
|
||||
subrange.next(),
|
||||
Some(
|
||||
&vec![
|
||||
ExcelDataType::Int(1),
|
||||
ExcelDataType::Bool(false),
|
||||
ExcelDataType::Float(1.333)
|
||||
ExcelData::Int(1),
|
||||
ExcelData::Bool(false),
|
||||
ExcelData::Float(1.333)
|
||||
][..]
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
subrange.next(),
|
||||
Some(
|
||||
&vec![
|
||||
ExcelDataType::Int(2),
|
||||
ExcelDataType::Empty,
|
||||
ExcelDataType::Float(2.333)
|
||||
][..]
|
||||
)
|
||||
Some(&vec![ExcelData::Int(2), ExcelData::Empty, ExcelData::Float(2.333)][..])
|
||||
);
|
||||
assert_eq!(
|
||||
subrange.next(),
|
||||
Some(
|
||||
&vec![
|
||||
ExcelDataType::Int(3),
|
||||
ExcelDataType::Bool(true),
|
||||
ExcelDataType::Float(3.333)
|
||||
ExcelData::Int(3),
|
||||
ExcelData::Bool(true),
|
||||
ExcelData::Float(3.333)
|
||||
][..]
|
||||
)
|
||||
);
|
||||
@ -471,51 +486,52 @@ mod tests {
|
||||
|
||||
let mut subrange = ExcelSubrange::new(&range, Some(1), Some(2), Some(1), Some(1));
|
||||
assert_eq!(subrange.size(), 2);
|
||||
assert_eq!(subrange.next(), Some(&vec![ExcelDataType::Bool(false)][..]));
|
||||
assert_eq!(subrange.next(), Some(&vec![ExcelDataType::Empty][..]));
|
||||
assert_eq!(subrange.next(), Some(&vec![ExcelData::Bool(false)][..]));
|
||||
assert_eq!(subrange.next(), Some(&vec![ExcelData::Empty][..]));
|
||||
assert_eq!(subrange.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inferes_schema_from_data() {
|
||||
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelDataType::String(String::from("int_column"))),
|
||||
Cell::new((0, 1), ExcelDataType::String(String::from("bool_column"))),
|
||||
Cell::new((0, 2), ExcelDataType::String(String::from("float column"))),
|
||||
Cell::new((0, 3), ExcelDataType::String(String::from("string_column"))),
|
||||
Cell::new(
|
||||
(0, 4),
|
||||
ExcelDataType::String(String::from("datetime_column")),
|
||||
),
|
||||
let range = calamine::Range::<ExcelData>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelData::String(String::from("int_column"))),
|
||||
Cell::new((0, 1), ExcelData::String(String::from("bool_column"))),
|
||||
Cell::new((0, 2), ExcelData::String(String::from("float column"))),
|
||||
Cell::new((0, 3), ExcelData::String(String::from("string_column"))),
|
||||
Cell::new((0, 4), ExcelData::String(String::from("datetime_column"))),
|
||||
Cell::new(
|
||||
(0, 5),
|
||||
ExcelDataType::String(String::from("datetime iso column")),
|
||||
),
|
||||
Cell::new(
|
||||
(0, 6),
|
||||
ExcelDataType::String(String::from("duration column")),
|
||||
ExcelData::String(String::from("datetime iso column")),
|
||||
),
|
||||
Cell::new((0, 6), ExcelData::String(String::from("duration column"))),
|
||||
Cell::new(
|
||||
(0, 7),
|
||||
ExcelDataType::String(String::from("duration iso column")),
|
||||
ExcelData::String(String::from("duration iso column")),
|
||||
),
|
||||
Cell::new((1, 0), ExcelDataType::Int(0)),
|
||||
Cell::new((1, 1), ExcelDataType::Bool(true)),
|
||||
Cell::new((1, 2), ExcelDataType::Float(0.333)),
|
||||
Cell::new((1, 3), ExcelDataType::String(String::from("test"))),
|
||||
Cell::new((1, 4), ExcelDataType::DateTime(44986.12)),
|
||||
Cell::new((1, 5), ExcelDataType::DateTimeIso(String::from("test"))),
|
||||
Cell::new((1, 6), ExcelDataType::Duration(44986.12)),
|
||||
Cell::new((1, 7), ExcelDataType::DurationIso(String::from("test"))),
|
||||
Cell::new((2, 0), ExcelDataType::Empty),
|
||||
Cell::new((2, 0), ExcelDataType::Empty),
|
||||
Cell::new((2, 1), ExcelDataType::Empty),
|
||||
Cell::new((2, 2), ExcelDataType::Empty),
|
||||
Cell::new((2, 3), ExcelDataType::Empty),
|
||||
Cell::new((2, 4), ExcelDataType::Empty),
|
||||
Cell::new((2, 5), ExcelDataType::Empty),
|
||||
Cell::new((2, 6), ExcelDataType::Empty),
|
||||
Cell::new((2, 7), ExcelDataType::Empty),
|
||||
Cell::new((1, 0), ExcelData::Int(0)),
|
||||
Cell::new((1, 1), ExcelData::Bool(true)),
|
||||
Cell::new((1, 2), ExcelData::Float(0.333)),
|
||||
Cell::new((1, 3), ExcelData::String(String::from("test"))),
|
||||
Cell::new(
|
||||
(1, 4),
|
||||
ExcelData::DateTime(calamine::ExcelDateTime::new(
|
||||
44986.12,
|
||||
calamine::ExcelDateTimeType::DateTime,
|
||||
false,
|
||||
)),
|
||||
),
|
||||
Cell::new((1, 5), ExcelData::DateTimeIso(String::from("test"))),
|
||||
Cell::new((1, 6), ExcelData::Float(44986.12)),
|
||||
Cell::new((1, 7), ExcelData::String(String::from("test"))),
|
||||
Cell::new((2, 0), ExcelData::Empty),
|
||||
Cell::new((2, 0), ExcelData::Empty),
|
||||
Cell::new((2, 1), ExcelData::Empty),
|
||||
Cell::new((2, 2), ExcelData::Empty),
|
||||
Cell::new((2, 3), ExcelData::Empty),
|
||||
Cell::new((2, 4), ExcelData::Empty),
|
||||
Cell::new((2, 5), ExcelData::Empty),
|
||||
Cell::new((2, 6), ExcelData::Empty),
|
||||
Cell::new((2, 7), ExcelData::Empty),
|
||||
]);
|
||||
|
||||
let schema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap();
|
||||
@ -537,24 +553,16 @@ mod tests {
|
||||
DataType::Timestamp(TimeUnit::Second, None),
|
||||
true
|
||||
),
|
||||
Field::new(
|
||||
"duration_column",
|
||||
DataType::Duration(TimeUnit::Second),
|
||||
true
|
||||
),
|
||||
Field::new(
|
||||
"duration_iso_column",
|
||||
DataType::Duration(TimeUnit::Second),
|
||||
true
|
||||
),
|
||||
Field::new("duration_column", DataType::Float64, true),
|
||||
Field::new("duration_iso_column", DataType::Utf8, true),
|
||||
])
|
||||
);
|
||||
|
||||
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelDataType::String(String::from("test_column"))),
|
||||
Cell::new((1, 0), ExcelDataType::Int(0)),
|
||||
Cell::new((2, 0), ExcelDataType::Empty),
|
||||
Cell::new((2, 0), ExcelDataType::Float(0.5)),
|
||||
let range = calamine::Range::<ExcelData>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelData::String(String::from("test_column"))),
|
||||
Cell::new((1, 0), ExcelData::Int(0)),
|
||||
Cell::new((2, 0), ExcelData::Empty),
|
||||
Cell::new((2, 0), ExcelData::Float(0.5)),
|
||||
]);
|
||||
|
||||
let schema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap();
|
||||
@ -564,27 +572,27 @@ mod tests {
|
||||
Schema::new(vec![Field::new("test_column", DataType::Utf8, true)])
|
||||
);
|
||||
|
||||
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelDataType::String(String::from("int_column"))),
|
||||
Cell::new((0, 1), ExcelDataType::Empty),
|
||||
Cell::new((0, 2), ExcelDataType::String(String::from("float column"))),
|
||||
let range = calamine::Range::<ExcelData>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelData::String(String::from("int_column"))),
|
||||
Cell::new((0, 1), ExcelData::Empty),
|
||||
Cell::new((0, 2), ExcelData::String(String::from("float column"))),
|
||||
]);
|
||||
|
||||
assert!(infer_schema(&range, &TableOptionExcel::default(), &None).is_err());
|
||||
|
||||
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelDataType::String(String::from("column1"))),
|
||||
Cell::new((0, 1), ExcelDataType::String(String::from("column2"))),
|
||||
Cell::new((1, 0), ExcelDataType::Int(1)),
|
||||
Cell::new((1, 1), ExcelDataType::Int(1)),
|
||||
Cell::new((1, 3), ExcelDataType::Int(1)),
|
||||
let range = calamine::Range::<ExcelData>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelData::String(String::from("column1"))),
|
||||
Cell::new((0, 1), ExcelData::String(String::from("column2"))),
|
||||
Cell::new((1, 0), ExcelData::Int(1)),
|
||||
Cell::new((1, 1), ExcelData::Int(1)),
|
||||
Cell::new((1, 3), ExcelData::Int(1)),
|
||||
]);
|
||||
assert!(infer_schema(&range, &TableOptionExcel::default(), &None).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inferes_schema_from_config() {
|
||||
let range = calamine::Range::<ExcelDataType>::from_sparse(vec![]);
|
||||
let range = calamine::Range::<ExcelData>::from_sparse(vec![]);
|
||||
let table_schema = TableSchema {
|
||||
columns: vec![
|
||||
TableColumn {
|
||||
@ -648,7 +656,7 @@ sheet_name = "uk_cities_with_headers"
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37));
|
||||
}
|
||||
@ -674,7 +682,7 @@ option:
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37));
|
||||
}
|
||||
@ -704,7 +712,7 @@ option:
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.column_statistics.len(), 6);
|
||||
assert_eq!(stats.num_rows, Precision::Exact(3));
|
||||
@ -712,33 +720,46 @@ option:
|
||||
|
||||
#[test]
|
||||
fn transforms_excel_range_to_record_batch() {
|
||||
let range: calamine::Range<ExcelDataType> =
|
||||
calamine::Range::<ExcelDataType>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelDataType::String("float_column".to_string())),
|
||||
Cell::new((1, 0), ExcelDataType::Float(1.333)),
|
||||
Cell::new((2, 0), ExcelDataType::Empty),
|
||||
Cell::new((3, 0), ExcelDataType::Float(3.333)),
|
||||
Cell::new((0, 1), ExcelDataType::String("integer_column".to_string())),
|
||||
Cell::new((1, 1), ExcelDataType::Int(1)),
|
||||
Cell::new((2, 1), ExcelDataType::Int(3)),
|
||||
Cell::new((3, 1), ExcelDataType::Empty),
|
||||
Cell::new((0, 2), ExcelDataType::String("boolean_column".to_string())),
|
||||
Cell::new((1, 2), ExcelDataType::Empty),
|
||||
Cell::new((2, 2), ExcelDataType::Bool(true)),
|
||||
Cell::new((3, 2), ExcelDataType::Bool(false)),
|
||||
Cell::new((0, 3), ExcelDataType::String("string_column".to_string())),
|
||||
Cell::new((1, 3), ExcelDataType::String("foo".to_string())),
|
||||
Cell::new((2, 3), ExcelDataType::String("bar".to_string())),
|
||||
Cell::new((3, 3), ExcelDataType::String("baz".to_string())),
|
||||
Cell::new((0, 4), ExcelDataType::String("mixed_column".to_string())),
|
||||
Cell::new((1, 4), ExcelDataType::Float(1.1)),
|
||||
Cell::new((2, 4), ExcelDataType::Int(1)),
|
||||
Cell::new((3, 4), ExcelDataType::Empty),
|
||||
Cell::new((0, 5), ExcelDataType::String("datetime_column".to_string())),
|
||||
Cell::new((1, 5), ExcelDataType::DateTime(44986.12)), // 2023-03-01T02:52:48
|
||||
Cell::new((2, 5), ExcelDataType::Empty),
|
||||
Cell::new((3, 5), ExcelDataType::DateTime(44900.12)), // 2022-12-05T02:52:48
|
||||
]);
|
||||
let range: calamine::Range<ExcelData> = calamine::Range::<ExcelData>::from_sparse(vec![
|
||||
Cell::new((0, 0), ExcelData::String("float_column".to_string())),
|
||||
Cell::new((1, 0), ExcelData::Float(1.333)),
|
||||
Cell::new((2, 0), ExcelData::Empty),
|
||||
Cell::new((3, 0), ExcelData::Float(3.333)),
|
||||
Cell::new((0, 1), ExcelData::String("integer_column".to_string())),
|
||||
Cell::new((1, 1), ExcelData::Int(1)),
|
||||
Cell::new((2, 1), ExcelData::Int(3)),
|
||||
Cell::new((3, 1), ExcelData::Empty),
|
||||
Cell::new((0, 2), ExcelData::String("boolean_column".to_string())),
|
||||
Cell::new((1, 2), ExcelData::Empty),
|
||||
Cell::new((2, 2), ExcelData::Bool(true)),
|
||||
Cell::new((3, 2), ExcelData::Bool(false)),
|
||||
Cell::new((0, 3), ExcelData::String("string_column".to_string())),
|
||||
Cell::new((1, 3), ExcelData::String("foo".to_string())),
|
||||
Cell::new((2, 3), ExcelData::String("bar".to_string())),
|
||||
Cell::new((3, 3), ExcelData::String("baz".to_string())),
|
||||
Cell::new((0, 4), ExcelData::String("mixed_column".to_string())),
|
||||
Cell::new((1, 4), ExcelData::Float(1.1)),
|
||||
Cell::new((2, 4), ExcelData::Int(1)),
|
||||
Cell::new((3, 4), ExcelData::Empty),
|
||||
Cell::new((0, 5), ExcelData::String("datetime_column".to_string())),
|
||||
Cell::new(
|
||||
(1, 5),
|
||||
ExcelData::DateTime(calamine::ExcelDateTime::new(
|
||||
44986.12,
|
||||
calamine::ExcelDateTimeType::DateTime,
|
||||
false,
|
||||
)),
|
||||
), // 2023-03-01T02:52:48
|
||||
Cell::new((2, 5), ExcelData::Empty),
|
||||
Cell::new(
|
||||
(3, 5),
|
||||
ExcelData::DateTime(calamine::ExcelDateTime::new(
|
||||
44900.12,
|
||||
calamine::ExcelDateTimeType::DateTime,
|
||||
false,
|
||||
)),
|
||||
), // 2022-12-05T02:52:48
|
||||
]);
|
||||
|
||||
let shema = infer_schema(&range, &TableOptionExcel::default(), &None).unwrap();
|
||||
let rb = excel_range_to_record_batch(range, &TableOptionExcel::default(), shema).unwrap();
|
||||
|
||||
@ -1082,7 +1082,7 @@ schema:
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37));
|
||||
}
|
||||
@ -1107,7 +1107,7 @@ uri: "sqlite://../test_data/sqlite/sample.{ext}"
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(37));
|
||||
}
|
||||
|
||||
@ -199,13 +199,17 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(500));
|
||||
if stats.num_rows != Precision::Absent {
|
||||
assert_eq!(stats.num_rows, Precision::Exact(500));
|
||||
}
|
||||
let stats = stats.column_statistics;
|
||||
assert_eq!(stats[0].null_count, Precision::Exact(245));
|
||||
assert_eq!(stats[1].null_count, Precision::Exact(373));
|
||||
assert_eq!(stats[2].null_count, Precision::Exact(237));
|
||||
if !stats.is_empty() && stats[0].null_count != Precision::Absent {
|
||||
assert_eq!(stats[0].null_count, Precision::Exact(245));
|
||||
assert_eq!(stats[1].null_count, Precision::Exact(373));
|
||||
assert_eq!(stats[2].null_count, Precision::Exact(237));
|
||||
}
|
||||
|
||||
match t.table.as_any().downcast_ref::<ListingTable>() {
|
||||
Some(_) => {}
|
||||
@ -227,9 +231,11 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(500));
|
||||
if stats.num_rows != Precision::Absent {
|
||||
assert_eq!(stats.num_rows, Precision::Exact(500));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -258,7 +264,7 @@ mod tests {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert_eq!(stats.num_rows, Precision::Exact(1500));
|
||||
}
|
||||
|
||||
@ -12,7 +12,8 @@ use crate::table;
|
||||
|
||||
pub fn test_data_path(relative_path: &str) -> String {
|
||||
let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
d.push("../test_data");
|
||||
d.pop();
|
||||
d.push("test_data");
|
||||
d.push(relative_path);
|
||||
d.to_string_lossy().to_string()
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ mod mysql {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert!(stats.num_rows.get_value().is_some());
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ mod postgres {
|
||||
.scan(&ctx.state(), None, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.statistics()
|
||||
.partition_statistics(None)
|
||||
.unwrap();
|
||||
assert!(stats.num_rows.get_value().is_some());
|
||||
}
|
||||
|
||||
@ -12,14 +12,14 @@ name = "roapi-ui"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
ehttp = { version = "0", features = ["json"] }
|
||||
ehttp = { version = "0.6", features = ["json"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
|
||||
# query ui
|
||||
eframe = "0"
|
||||
egui = "0"
|
||||
egui_extras = { version = "0", features = ["syntect"] }
|
||||
eframe = "0.33"
|
||||
egui = "0.33"
|
||||
egui_extras = { version = "0.33", features = ["syntect"] }
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
wasm-bindgen-futures = "0"
|
||||
|
||||
@ -4,4 +4,4 @@ public_url ="/ui/"
|
||||
minify = "on_release"
|
||||
|
||||
[tools]
|
||||
wasm_bindgen = "0.2.100"
|
||||
wasm_bindgen = "0.2.114"
|
||||
|
||||
@ -311,12 +311,17 @@ impl ROAPIUI {
|
||||
fn query_editor(ctx: &egui::Context, ui: &mut egui::Ui, query: &mut String) {
|
||||
let theme = egui_extras::syntax_highlighting::CodeTheme::from_memory(ctx, ui.style());
|
||||
|
||||
let mut layouter = |ui: &egui::Ui, string: &str, wrap_width: f32| {
|
||||
let mut layouter = |ui: &egui::Ui, string: &dyn egui::TextBuffer, wrap_width: f32| {
|
||||
let language = "sql";
|
||||
let mut layout_job =
|
||||
egui_extras::syntax_highlighting::highlight(ctx, ui.style(), &theme, string, language);
|
||||
layout_job.wrap.max_width = wrap_width;
|
||||
ui.fonts(|f| f.layout_job(layout_job))
|
||||
let mut job = egui_extras::syntax_highlighting::highlight(
|
||||
ctx,
|
||||
ui.style(),
|
||||
&theme,
|
||||
string.as_str(),
|
||||
language,
|
||||
);
|
||||
job.wrap.max_width = wrap_width;
|
||||
ui.painter().layout_job(job)
|
||||
};
|
||||
|
||||
ui.add(
|
||||
@ -371,7 +376,7 @@ impl eframe::App for ROAPIUI {
|
||||
ui.menu_button("⚙", |ui| {
|
||||
if ui.button("Syntax Highlight Theme").clicked() {
|
||||
self.show_settings = true;
|
||||
ui.close_menu();
|
||||
ui.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "roapi"
|
||||
version = "0.12.7"
|
||||
version = "0.13.0"
|
||||
authors = ["QP Hou <dave2008713@gmail.com>"]
|
||||
homepage = "https://github.com/roapi/roapi"
|
||||
license = "MIT"
|
||||
@ -21,9 +21,9 @@ snmalloc-rs = { version = "0.3", optional = true }
|
||||
|
||||
# dependencies related to axum
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
axum = { version = "0.7", features = ["default", "http2"] }
|
||||
axum = { version = "0.8", features = ["default", "http2"] }
|
||||
tower = { version = "0" } # introduced only for tower::layer::util::Stack
|
||||
tower-http = { version = "0.5", features = ["cors", "trace"] }
|
||||
tower-http = { version = "0.6", features = ["cors", "trace"] }
|
||||
tower-layer = "0"
|
||||
tower-service = "0"
|
||||
tracing = "0"
|
||||
@ -44,8 +44,8 @@ thiserror = "1"
|
||||
snafu = "0"
|
||||
|
||||
# flight-sql
|
||||
arrow-flight = { version = "55", features = ["flight-sql-experimental"] }
|
||||
tonic = { version = "0", features = ["tls"] }
|
||||
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
|
||||
tonic = { version = "0.14", features = ["tls-webpki-roots"] }
|
||||
prost = "0"
|
||||
futures = "0"
|
||||
# TODO: remove once_cell dependency
|
||||
@ -58,7 +58,7 @@ rust-embed = { version = "8", optional = true }
|
||||
mime_guess = { version = "2", optional = true }
|
||||
|
||||
# postgres wire protocol
|
||||
pgwire = "0.30.2"
|
||||
pgwire = "0.35"
|
||||
hex = "0.4"
|
||||
|
||||
[features]
|
||||
@ -74,7 +74,7 @@ database-postgres = ["columnq/database-postgres"]
|
||||
ui = ["rust-embed", "mime_guess"]
|
||||
|
||||
[dev-dependencies]
|
||||
reqwest = { version = "0", default-features = false, features = [
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
"json",
|
||||
"rustls-tls",
|
||||
] }
|
||||
@ -83,8 +83,8 @@ tokio-postgres = "0.7"
|
||||
tower = "*"
|
||||
tempfile = "*"
|
||||
tokio-stream = { version = "*", features = ["net"] }
|
||||
arrow-cast = "55"
|
||||
arrow-ipc = "55" # for flight_sql test
|
||||
arrow-cast = { version = "57", features = ["prettyprint"] }
|
||||
arrow-ipc = "57" # for flight_sql test
|
||||
|
||||
# TODO: uncomment this when we exclude roapi from root workspace
|
||||
# [profile.release]
|
||||
|
||||
@ -16,13 +16,13 @@ pub async fn version() -> Result<impl IntoResponse, crate::error::ApiErrResp> {
|
||||
pub fn register_api_routes<H: RoapiContext>() -> Router {
|
||||
let mut api_routes = Router::new()
|
||||
.route("/version", get(version))
|
||||
.route("/tables/:table_name", get(api::rest::get_table::<H>))
|
||||
.route("/tables/{table_name}", get(api::rest::get_table::<H>))
|
||||
.route("/sql", post(api::sql::post::<H>))
|
||||
.route("/kv/:kv_name/:key", get(api::kv::get::<H>))
|
||||
.route("/kv/{kv_name}/{key}", get(api::kv::get::<H>))
|
||||
.route("/graphql", post(api::graphql::post::<H>))
|
||||
.route("/schema", get(api::schema::schema::<H>))
|
||||
.route(
|
||||
"/schema/:table_name",
|
||||
"/schema/{table_name}",
|
||||
get(api::schema::get_by_table_name::<H>),
|
||||
);
|
||||
|
||||
|
||||
@ -841,7 +841,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
|
||||
|
||||
let plan_schema = plan.schema();
|
||||
|
||||
let arrow_schema = (&**plan_schema).into();
|
||||
let arrow_schema = plan_schema.as_arrow().clone();
|
||||
let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default())
|
||||
.try_into()
|
||||
.map_err(|e| internal_error!("Unable to serialize schema", e))?;
|
||||
|
||||
@ -23,7 +23,7 @@ pub enum Error {
|
||||
BindTcp { source: std::io::Error },
|
||||
}
|
||||
|
||||
pub type HttpApiServe = axum::serve::Serve<axum::Router, axum::Router>;
|
||||
pub type HttpApiServe = axum::serve::Serve<tokio::net::TcpListener, axum::Router, axum::Router>;
|
||||
|
||||
pub async fn health() -> Result<impl IntoResponse, crate::error::ApiErrResp> {
|
||||
Ok(api::bytes_to_resp("OK".into(), "text/plain"))
|
||||
|
||||
@ -20,7 +20,6 @@ use futures::stream;
|
||||
use futures::Sink;
|
||||
use log::info;
|
||||
use pgwire::api::auth::noop::NoopStartupHandler;
|
||||
use pgwire::api::copy::NoopCopyHandler;
|
||||
use pgwire::api::portal::Portal;
|
||||
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
|
||||
use pgwire::api::results::{
|
||||
@ -28,7 +27,7 @@ use pgwire::api::results::{
|
||||
QueryResponse, Response,
|
||||
};
|
||||
use pgwire::api::stmt::{NoopQueryParser, StoredStatement};
|
||||
use pgwire::api::{ClientInfo, ClientPortalStore, NoopErrorHandler, PgWireServerHandlers, Type};
|
||||
use pgwire::api::{ClientInfo, ClientPortalStore, PgWireServerHandlers, Type};
|
||||
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
|
||||
use pgwire::messages::PgWireBackendMessage;
|
||||
use pgwire::tokio::process_socket;
|
||||
@ -112,7 +111,7 @@ impl<H: RoapiContext> RoapiQueryHandler<H> {
|
||||
Self { ctx }
|
||||
}
|
||||
|
||||
async fn execute_query(&self, query: &str) -> PgWireResult<QueryResponse<'static>> {
|
||||
async fn execute_query(&self, query: &str) -> PgWireResult<QueryResponse> {
|
||||
info!("executing query: {query}");
|
||||
|
||||
// Handle some special PostgreSQL queries
|
||||
@ -359,7 +358,7 @@ impl<H: RoapiContext> NoopStartupHandler for RoapiQueryHandler<H> {
|
||||
|
||||
#[async_trait]
|
||||
impl<H: RoapiContext> SimpleQueryHandler for RoapiQueryHandler<H> {
|
||||
async fn do_query<'a, C>(&self, _client: &mut C, query: &str) -> PgWireResult<Vec<Response<'a>>>
|
||||
async fn do_query<C>(&self, _client: &mut C, query: &str) -> PgWireResult<Vec<Response>>
|
||||
where
|
||||
C: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
|
||||
C::Error: Debug,
|
||||
@ -412,12 +411,12 @@ impl<H: RoapiContext> ExtendedQueryHandler for RoapiQueryHandler<H> {
|
||||
Arc::new(NoopQueryParser)
|
||||
}
|
||||
|
||||
async fn do_query<'a, C>(
|
||||
async fn do_query<C>(
|
||||
&self,
|
||||
_client: &mut C,
|
||||
portal: &Portal<Self::Statement>,
|
||||
_max_rows: usize,
|
||||
) -> PgWireResult<Response<'a>>
|
||||
) -> PgWireResult<Response>
|
||||
where
|
||||
C: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
|
||||
C::PortalStore: Send + Sync,
|
||||
@ -475,31 +474,17 @@ struct RoapiHandlerFactory<H: RoapiContext> {
|
||||
}
|
||||
|
||||
impl<H: RoapiContext> PgWireServerHandlers for RoapiHandlerFactory<H> {
|
||||
type StartupHandler = RoapiQueryHandler<H>;
|
||||
type SimpleQueryHandler = RoapiQueryHandler<H>;
|
||||
type ExtendedQueryHandler = RoapiQueryHandler<H>;
|
||||
type CopyHandler = NoopCopyHandler;
|
||||
type ErrorHandler = NoopErrorHandler;
|
||||
|
||||
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
|
||||
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
|
||||
self.handler.clone()
|
||||
}
|
||||
|
||||
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
|
||||
fn extended_query_handler(&self) -> Arc<impl ExtendedQueryHandler> {
|
||||
self.handler.clone()
|
||||
}
|
||||
|
||||
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
|
||||
fn startup_handler(&self) -> Arc<impl pgwire::api::auth::StartupHandler> {
|
||||
self.handler.clone()
|
||||
}
|
||||
|
||||
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
|
||||
Arc::new(NoopCopyHandler)
|
||||
}
|
||||
|
||||
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
|
||||
Arc::new(NoopErrorHandler)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresServer<H: RoapiContext> {
|
||||
|
||||
@ -11,7 +11,7 @@ fn test_load_yaml_datafusion_config() {
|
||||
let cfg: Config = serde_yaml::from_str(&config_content).unwrap();
|
||||
let df_cfg = cfg.get_datafusion_config().unwrap();
|
||||
|
||||
assert_eq!(df_cfg.options().sql_parser.dialect, "Hive");
|
||||
assert_eq!(df_cfg.options().sql_parser.dialect.to_string(), "hive");
|
||||
assert!(df_cfg.options().explain.physical_plan_only);
|
||||
assert_eq!(df_cfg.options().optimizer.max_passes, 10);
|
||||
assert_eq!(df_cfg.options().execution.batch_size, 100);
|
||||
|
||||
@ -7,7 +7,7 @@ use axum::http::HeaderMap;
|
||||
use axum::{extract::State, response::IntoResponse, routing::get};
|
||||
|
||||
async fn http_server() -> (
|
||||
axum::serve::Serve<axum::Router, axum::Router>,
|
||||
axum::serve::Serve<tokio::net::TcpListener, axum::Router, axum::Router>,
|
||||
std::net::SocketAddr,
|
||||
) {
|
||||
async fn serve_json(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user