mirror of
https://github.com/roapi/roapi.git
synced 2026-06-05 21:04:02 +08:00
Add MySQL, Sqlite support. (#162)
* added MySQL and Sqlite datasource support * updated arrow, datafusion and deltalake to latest version * cleared simd ci test cache to workaround nightly compiler bug
This commit is contained in:
parent
49aeaf1d71
commit
3ace6078fa
47
.github/workflows/build.yml
vendored
47
.github/workflows/build.yml
vendored
@ -60,10 +60,10 @@ jobs:
|
||||
~/.cargo/registry/cache/
|
||||
~/.cargo/git/db/
|
||||
target/
|
||||
key: simd-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
|
||||
key: simd-2-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
|
||||
restore-keys: |
|
||||
simd-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
|
||||
simd-${{ runner.os }}-cargo-
|
||||
simd-2-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
|
||||
simd-2-${{ runner.os }}-cargo-
|
||||
- name: Install nightly rust
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
@ -78,6 +78,41 @@ jobs:
|
||||
which cargo-cache || cargo install cargo-cache
|
||||
cargo cache trim -l 1G
|
||||
|
||||
database_test:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUSTFLAGS: "-C target-cpu=skylake"
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/cache@v2
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/bin/
|
||||
~/.cargo/registry/index/
|
||||
~/.cargo/registry/cache/
|
||||
~/.cargo/git/db/
|
||||
target/
|
||||
key: database-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
|
||||
restore-keys: |
|
||||
database-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
|
||||
database-${{ runner.os }}-cargo-
|
||||
- name: Install nightly rust
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: default
|
||||
toolchain: nightly-${{ env.RUST_TC_NIGHTLY_VER }}
|
||||
override: true
|
||||
- name: Check
|
||||
run: cargo clippy --features database
|
||||
- name: Build
|
||||
run: cargo build --features database
|
||||
- name: Run tests
|
||||
run: cargo test --features database
|
||||
- name: Trim cache
|
||||
run: |
|
||||
which cargo-cache || cargo install cargo-cache
|
||||
cargo cache trim -l 1G
|
||||
|
||||
# make sure native-tls always builds
|
||||
openssl_build:
|
||||
runs-on: ubuntu-latest
|
||||
@ -91,6 +126,10 @@ jobs:
|
||||
override: true
|
||||
- name: Check
|
||||
run: cargo clippy --no-default-features --features=native-tls
|
||||
- name: Trim cache
|
||||
run: |
|
||||
which cargo-cache || cargo install cargo-cache
|
||||
cargo cache trim -l 1G
|
||||
|
||||
# cross compile from x86 mac to arm64, this is to make sure universal2
|
||||
# release will build without error
|
||||
@ -125,7 +164,7 @@ jobs:
|
||||
# set SDKROOT for C dependencies
|
||||
export SDKROOT=$(xcrun --sdk macosx --show-sdk-path)
|
||||
cd roapi-http && \
|
||||
cargo build --bin roapi-http --target aarch64-apple-darwin
|
||||
cargo build --bin roapi-http --features database --target aarch64-apple-darwin
|
||||
- name: Trim cache
|
||||
run: |
|
||||
which cargo-cache || cargo install cargo-cache
|
||||
|
||||
1159
Cargo.lock
generated
1159
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -7,7 +7,7 @@ use std::path::PathBuf;
|
||||
|
||||
use columnq::datafusion::arrow::util::pretty;
|
||||
use columnq::table::parse_table_uri_arg;
|
||||
use columnq::{encoding, ColumnQ, ExecutionConfig};
|
||||
use columnq::{encoding, ColumnQ, SessionConfig};
|
||||
|
||||
#[cfg(snmalloc)]
|
||||
#[global_allocator]
|
||||
@ -87,7 +87,7 @@ async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
async fn cmd_console(args: &clap::ArgMatches) -> anyhow::Result<()> {
|
||||
let config = ExecutionConfig::default().with_information_schema(true);
|
||||
let config = SessionConfig::default().with_information_schema(true);
|
||||
let mut cq = ColumnQ::new_with_config(config);
|
||||
|
||||
if let Some(tables) = args.values_of("table") {
|
||||
@ -107,7 +107,7 @@ fn bytes_to_stdout(bytes: &[u8]) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> {
|
||||
let config = ExecutionConfig::default().with_information_schema(true);
|
||||
let config = SessionConfig::default().with_information_schema(true);
|
||||
let mut cq = ColumnQ::new_with_config(config);
|
||||
|
||||
if let Some(tables) = args.values_of("table") {
|
||||
|
||||
@ -11,7 +11,7 @@ name = "columnq"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
datafusion = "7"
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f619d43494e182e260423f8333eed9ea789f0cb8", version = "7.0.0" }
|
||||
|
||||
log = "0"
|
||||
regex = "1"
|
||||
@ -34,7 +34,6 @@ reqwest = { version = "0.11", default-features = false, features = [
|
||||
|
||||
tokio = { version = "1", features = ["rt-multi-thread"] }
|
||||
futures = "0.3"
|
||||
|
||||
# S3
|
||||
rusoto_core = { version = "0.47", default-features = false }
|
||||
rusoto_s3 = { version = "0.47", default-features = false }
|
||||
@ -43,16 +42,25 @@ rusoto_sts = { version = "0.47", default-features = false }
|
||||
hyper-tls = { version = "0.5.0", default-features = false, optional = true }
|
||||
hyper-rustls = { version = "0.23.0", default-features = false, optional = true }
|
||||
|
||||
[dependencies.deltalake]
|
||||
git = "https://github.com/delta-io/delta-rs.git"
|
||||
rev = "0b99151d3a413bb3d90b318e7a825feb77e62902"
|
||||
default-features = false
|
||||
features = ["datafusion-ext"]
|
||||
|
||||
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "f5e4b5f94393b517d3d88c200d7169cce11b304b", default-features = false, features = [
|
||||
"datafusion-ext",
|
||||
] }
|
||||
[dependencies.connectorx]
|
||||
git = "https://github.com/sfu-db/connector-x.git"
|
||||
rev = "dba588ad26fb41a75d05cd54e08633b8063804c2"
|
||||
version = "0.2.5"
|
||||
features = ["default", "dst_arrow", "src_mysql", "src_sqlite"]
|
||||
optional = true
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1"
|
||||
serde_yaml = "0.8"
|
||||
tempdir = "0"
|
||||
pretty_assertions = "*"
|
||||
dotenv = "*"
|
||||
|
||||
[features]
|
||||
default = ["rustls"]
|
||||
@ -82,3 +90,4 @@ native-tls = [
|
||||
"yup-oauth2/hyper-tls",
|
||||
]
|
||||
simd = ["datafusion/simd"]
|
||||
database = ["connectorx"]
|
||||
|
||||
@ -5,8 +5,8 @@ use std::sync::Arc;
|
||||
use datafusion::arrow;
|
||||
use datafusion::arrow::array::as_string_array;
|
||||
use datafusion::arrow::array::StringArray;
|
||||
pub use datafusion::execution::context::ExecutionConfig;
|
||||
use datafusion::execution::context::ExecutionContext;
|
||||
pub use datafusion::execution::context::SessionConfig;
|
||||
use datafusion::execution::context::SessionContext;
|
||||
use datafusion::physical_plan::collect;
|
||||
|
||||
use crate::error::{ColumnQError, QueryError};
|
||||
@ -14,18 +14,18 @@ use crate::query;
|
||||
use crate::table::{self, KeyValueSource, TableSource};
|
||||
|
||||
pub struct ColumnQ {
|
||||
dfctx: ExecutionContext,
|
||||
dfctx: SessionContext,
|
||||
schema_map: HashMap<String, arrow::datatypes::SchemaRef>,
|
||||
kv_catalog: HashMap<String, Arc<HashMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl ColumnQ {
|
||||
pub fn new() -> Self {
|
||||
Self::new_with_config(ExecutionConfig::default())
|
||||
Self::new_with_config(SessionConfig::default())
|
||||
}
|
||||
|
||||
pub fn new_with_config(config: ExecutionConfig) -> Self {
|
||||
let dfctx = ExecutionContext::with_config(config);
|
||||
pub fn new_with_config(config: SessionConfig) -> Self {
|
||||
let dfctx = SessionContext::with_config(config);
|
||||
let schema_map = HashMap::<String, arrow::datatypes::SchemaRef>::new();
|
||||
Self {
|
||||
dfctx,
|
||||
@ -59,7 +59,7 @@ impl ColumnQ {
|
||||
|
||||
let filters = &[];
|
||||
let exec_plan = table.scan(&projections, filters, None).await?;
|
||||
let batches = collect(exec_plan, self.dfctx.runtime_env()).await?;
|
||||
let batches = collect(exec_plan, self.dfctx.task_ctx()).await?;
|
||||
let mut kv = HashMap::new();
|
||||
for batch in batches {
|
||||
let col_key = batch
|
||||
|
||||
@ -65,6 +65,9 @@ pub enum ColumnQError {
|
||||
|
||||
#[error("Generic error: {0}")]
|
||||
Generic(String),
|
||||
|
||||
#[error("Database error: {0}")]
|
||||
Database(String),
|
||||
}
|
||||
|
||||
impl ColumnQError {
|
||||
|
||||
@ -181,9 +181,9 @@ fn to_datafusion_predicates<'a, 'b>(
|
||||
}
|
||||
|
||||
pub fn query_to_df(
|
||||
dfctx: &datafusion::execution::context::ExecutionContext,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
q: &str,
|
||||
) -> Result<Arc<dyn datafusion::dataframe::DataFrame>, QueryError> {
|
||||
) -> Result<Arc<datafusion::dataframe::DataFrame>, QueryError> {
|
||||
let doc = parse_query::<&str>(q)?;
|
||||
|
||||
let def = match doc.definitions.len() {
|
||||
@ -362,7 +362,7 @@ pub fn query_to_df(
|
||||
}
|
||||
|
||||
pub async fn exec_query(
|
||||
dfctx: &datafusion::execution::context::ExecutionContext,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
q: &str,
|
||||
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
|
||||
query_to_df(dfctx, q)?
|
||||
@ -374,7 +374,7 @@ pub async fn exec_query(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::arrow::array::*;
|
||||
use datafusion::execution::context::ExecutionContext;
|
||||
use datafusion::execution::context::SessionContext;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
|
||||
use super::*;
|
||||
@ -382,7 +382,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn simple_query_planning() -> anyhow::Result<()> {
|
||||
let mut dfctx = ExecutionContext::new();
|
||||
let mut dfctx = SessionContext::new();
|
||||
register_table_properties(&mut dfctx)?;
|
||||
|
||||
let df = query_to_df(
|
||||
@ -414,7 +414,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn consistent_and_deterministics_logical_plan() -> anyhow::Result<()> {
|
||||
let mut dfctx = ExecutionContext::new();
|
||||
let mut dfctx = SessionContext::new();
|
||||
register_table_properties(&mut dfctx)?;
|
||||
|
||||
let df = query_to_df(
|
||||
@ -449,7 +449,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn boolean_literal_as_predicate_operand() -> anyhow::Result<()> {
|
||||
let mut dfctx = ExecutionContext::new();
|
||||
let mut dfctx = SessionContext::new();
|
||||
register_table_properties(&mut dfctx)?;
|
||||
|
||||
let batches = exec_query(
|
||||
|
||||
@ -54,10 +54,10 @@ fn num_parse_err(e: std::num::ParseIntError) -> QueryError {
|
||||
}
|
||||
|
||||
pub fn table_query_to_df(
|
||||
dfctx: &datafusion::execution::context::ExecutionContext,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
table_name: &str,
|
||||
params: &HashMap<String, String>,
|
||||
) -> Result<Arc<dyn datafusion::dataframe::DataFrame>, QueryError> {
|
||||
) -> Result<Arc<datafusion::dataframe::DataFrame>, QueryError> {
|
||||
lazy_static! {
|
||||
static ref RE_REST_FILTER: Regex =
|
||||
Regex::new(r"filter\[(?P<column>.+)\](?P<op>.+)?").unwrap();
|
||||
@ -167,7 +167,7 @@ pub fn table_query_to_df(
|
||||
}
|
||||
|
||||
pub async fn query_table(
|
||||
dfctx: &datafusion::execution::context::ExecutionContext,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
table_name: &str,
|
||||
params: &HashMap<String, String>,
|
||||
) -> Result<Vec<RecordBatch>, QueryError> {
|
||||
@ -180,14 +180,14 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
use datafusion::arrow::array::*;
|
||||
use datafusion::execution::context::ExecutionContext;
|
||||
use datafusion::execution::context::SessionContext;
|
||||
use datafusion::prelude::*;
|
||||
|
||||
use crate::test_util::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn consistent_and_deterministics_logical_plan() -> anyhow::Result<()> {
|
||||
let mut dfctx = ExecutionContext::new();
|
||||
let mut dfctx = SessionContext::new();
|
||||
register_table_ubuntu_ami(&mut dfctx).await?;
|
||||
|
||||
let mut params = HashMap::<String, String>::new();
|
||||
@ -215,7 +215,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn simple_filter() -> anyhow::Result<()> {
|
||||
let mut dfctx = ExecutionContext::new();
|
||||
let mut dfctx = SessionContext::new();
|
||||
register_table_ubuntu_ami(&mut dfctx).await?;
|
||||
let mut params = HashMap::<String, String>::new();
|
||||
|
||||
|
||||
@ -5,15 +5,15 @@ use datafusion::arrow;
|
||||
use crate::error::QueryError;
|
||||
|
||||
pub async fn exec_query(
|
||||
dfctx: &datafusion::execution::context::ExecutionContext,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
sql: &str,
|
||||
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
|
||||
let plan = dfctx
|
||||
.create_logical_plan(sql)
|
||||
.map_err(QueryError::plan_sql)?;
|
||||
|
||||
let df: Arc<dyn datafusion::dataframe::DataFrame> = Arc::new(
|
||||
datafusion::execution::dataframe_impl::DataFrameImpl::new(dfctx.state.clone(), &plan),
|
||||
let df: Arc<datafusion::dataframe::DataFrame> = Arc::new(
|
||||
datafusion::dataframe::DataFrame::new(dfctx.state.clone(), &plan),
|
||||
);
|
||||
|
||||
df.collect().await.map_err(QueryError::query_exec)
|
||||
@ -22,14 +22,14 @@ pub async fn exec_query(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::array::*;
|
||||
use datafusion::execution::context::ExecutionContext;
|
||||
use datafusion::execution::context::SessionContext;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn group_by_aggregation() -> anyhow::Result<()> {
|
||||
let mut dfctx = ExecutionContext::new();
|
||||
let mut dfctx = SessionContext::new();
|
||||
register_table_properties(&mut dfctx)?;
|
||||
|
||||
let batches = exec_query(
|
||||
|
||||
@ -13,7 +13,7 @@ pub async fn to_mem_table(
|
||||
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
|
||||
debug!("loading arrow table data...");
|
||||
let mut schema_and_partitions = partitions_from_table_source!(t, |mut r| {
|
||||
let arrow_file_reader = arrow::ipc::reader::FileReader::try_new(&mut r)?;
|
||||
let arrow_file_reader = arrow::ipc::reader::FileReader::try_new(&mut r, None)?;
|
||||
let schema = (*arrow_file_reader.schema()).clone();
|
||||
|
||||
arrow_file_reader
|
||||
|
||||
@ -13,7 +13,7 @@ pub async fn to_mem_table(
|
||||
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
|
||||
debug!("loading arrow table data...");
|
||||
let mut schema_and_partitions = partitions_from_table_source!(t, |mut r| {
|
||||
let arrow_stream_reader = arrow::ipc::reader::StreamReader::try_new(&mut r)?;
|
||||
let arrow_stream_reader = arrow::ipc::reader::StreamReader::try_new(&mut r, None)?;
|
||||
let schema = (*arrow_stream_reader.schema()).clone();
|
||||
|
||||
arrow_stream_reader
|
||||
|
||||
116
columnq/src/table/database.rs
Normal file
116
columnq/src/table/database.rs
Normal file
@ -0,0 +1,116 @@
|
||||
pub enum DatabaseLoader {
|
||||
MySQL,
|
||||
SQLite,
|
||||
Postgres,
|
||||
}
|
||||
|
||||
#[cfg(feature = "database")]
|
||||
mod imp {
|
||||
use crate::error::ColumnQError;
|
||||
use crate::table::TableSource;
|
||||
use connectorx::prelude::*;
|
||||
use connectorx::sources::mysql::BinaryProtocol;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use log::debug;
|
||||
|
||||
use super::DatabaseLoader;
|
||||
|
||||
impl DatabaseLoader {
|
||||
pub fn to_mem_table(
|
||||
&self,
|
||||
t: &TableSource,
|
||||
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
|
||||
debug!("loading database table data...");
|
||||
let queries = &[format!("SELECT * FROM {}", t.name)];
|
||||
let mut destination = ArrowDestination::new();
|
||||
match self {
|
||||
DatabaseLoader::MySQL => {
|
||||
let source = MySQLSource::<BinaryProtocol>::new(t.get_uri_str(), 2)
|
||||
.map_err(|e| ColumnQError::Database(e.to_string()))?;
|
||||
let dispatcher =
|
||||
Dispatcher::<
|
||||
MySQLSource<BinaryProtocol>,
|
||||
ArrowDestination,
|
||||
MySQLArrowTransport<BinaryProtocol>,
|
||||
>::new(source, &mut destination, queries, None);
|
||||
dispatcher
|
||||
.run()
|
||||
.map_err(|e| ColumnQError::Database(e.to_string()))?;
|
||||
}
|
||||
DatabaseLoader::SQLite => {
|
||||
let uri = t.get_uri_str().replace("sqlite://", "");
|
||||
let source = SQLiteSource::new(&uri, 2)
|
||||
.map_err(|e| ColumnQError::Database(e.to_string()))?;
|
||||
let dispatcher = Dispatcher::<
|
||||
SQLiteSource,
|
||||
ArrowDestination,
|
||||
SQLiteArrowTransport,
|
||||
>::new(
|
||||
source, &mut destination, queries, None
|
||||
);
|
||||
dispatcher
|
||||
.run()
|
||||
.map_err(|e| ColumnQError::Database(e.to_string()))?;
|
||||
}
|
||||
DatabaseLoader::Postgres => {
|
||||
// ToDo `Cannot start a runtime from within a runtime` error in `connector-x PostgresSource`
|
||||
return Err(ColumnQError::Database(
|
||||
"Postgres database features not be supported for now.".to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
let schema_ref = destination.arrow_schema();
|
||||
let data: Vec<RecordBatch> = destination.arrow().unwrap();
|
||||
Ok(datafusion::datasource::MemTable::try_new(
|
||||
schema_ref,
|
||||
vec![data],
|
||||
)?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "database"))]
|
||||
mod imp {
|
||||
use crate::error::ColumnQError;
|
||||
use crate::table::TableSource;
|
||||
|
||||
use super::DatabaseLoader;
|
||||
|
||||
impl DatabaseLoader {
|
||||
pub fn to_mem_table(
|
||||
&self,
|
||||
_t: &TableSource,
|
||||
) -> Result<datafusion::datasource::MemTable, ColumnQError> {
|
||||
Err(ColumnQError::Database(
|
||||
"Enable 'database' feature flag to support this".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub use imp::*;
|
||||
|
||||
#[cfg(feature = "database")]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::datasource::TableProvider;
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
|
||||
use crate::table::TableSource;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_mysql() -> anyhow::Result<()> {
|
||||
dotenv().ok();
|
||||
if let Ok(name) = env::var("TABLE_NAME") {
|
||||
let t = DatabaseLoader::MySQL
|
||||
.to_mem_table(&TableSource::new(name, env::var("MYSQL_URL")?))?;
|
||||
let stats = t.scan(&None, &[], None).await?.statistics();
|
||||
assert!(stats.num_rows.is_some());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -16,6 +16,7 @@ use crate::error::ColumnQError;
|
||||
pub mod arrow_ipc_file;
|
||||
pub mod arrow_ipc_stream;
|
||||
pub mod csv;
|
||||
pub mod database;
|
||||
pub mod delta;
|
||||
pub mod google_spreadsheets;
|
||||
pub mod json;
|
||||
@ -108,12 +109,14 @@ impl TableOptionCsv {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn with_delimiter(mut self, d: u8) -> Self {
|
||||
self.delimiter = d;
|
||||
self
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn with_has_header(mut self, has_header: bool) -> Self {
|
||||
self.has_header = has_header;
|
||||
self
|
||||
@ -208,6 +211,9 @@ pub enum TableLoadOption {
|
||||
delta(TableOptionDelta),
|
||||
arrow {},
|
||||
arrows {},
|
||||
mysql {},
|
||||
sqlite {},
|
||||
postgres {},
|
||||
}
|
||||
|
||||
impl TableLoadOption {
|
||||
@ -250,6 +256,9 @@ impl TableLoadOption {
|
||||
Self::google_spreadsheet(_) | Self::delta { .. } => "",
|
||||
Self::arrow { .. } => "arrow",
|
||||
Self::arrows { .. } => "arrows",
|
||||
Self::mysql { .. } => "mysql",
|
||||
Self::sqlite { .. } => "sqlite",
|
||||
Self::postgres { .. } => "postgres",
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -314,11 +323,13 @@ impl From<KeyValueSource> for TableSource {
|
||||
|
||||
impl TableSource {
|
||||
pub fn new(name: impl Into<String>, source: impl Into<TableIoSource>) -> Self {
|
||||
let io_source = source.into();
|
||||
let option = Self::parse_option(&io_source);
|
||||
Self {
|
||||
name: name.into(),
|
||||
io_source: source.into(),
|
||||
io_source,
|
||||
schema: None,
|
||||
option: None,
|
||||
option,
|
||||
batch_size: Self::default_batch_size(),
|
||||
}
|
||||
}
|
||||
@ -332,11 +343,13 @@ impl TableSource {
|
||||
8192
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_option(mut self, option: impl Into<TableLoadOption>) -> Self {
|
||||
self.option = Some(option.into());
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_schema(mut self, schema: impl Into<TableSchema>) -> Self {
|
||||
self.schema = Some(schema.into());
|
||||
self
|
||||
@ -349,6 +362,22 @@ impl TableSource {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_option(source: &TableIoSource) -> Option<TableLoadOption> {
|
||||
match source {
|
||||
TableIoSource::Uri(uri) => {
|
||||
let uri = URIReference::try_from(uri.as_str()).ok()?;
|
||||
let scheme = uri.scheme()?;
|
||||
match scheme.as_str() {
|
||||
"mysql" => Some(TableLoadOption::mysql {}),
|
||||
"sqlite" => Some(TableLoadOption::sqlite {}),
|
||||
"postgresql" => Some(TableLoadOption::postgres {}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
TableIoSource::Memory(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parsed_uri(&self) -> Result<URIReference, ColumnQError> {
|
||||
match &self.io_source {
|
||||
TableIoSource::Uri(uri) => URIReference::try_from(uri.as_str()).map_err(|_| {
|
||||
@ -404,7 +433,7 @@ impl TableSource {
|
||||
(None, TableIoSource::Memory(_)) => {
|
||||
return Err(ColumnQError::Generic(
|
||||
"cannot detect table extension from memory IO source, please specify a format option".to_string()
|
||||
));
|
||||
));
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -423,6 +452,15 @@ pub async fn load(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQErro
|
||||
TableLoadOption::delta { .. } => delta::to_datafusion_table(t).await?,
|
||||
TableLoadOption::arrow { .. } => Arc::new(arrow_ipc_file::to_mem_table(t).await?),
|
||||
TableLoadOption::arrows { .. } => Arc::new(arrow_ipc_stream::to_mem_table(t).await?),
|
||||
TableLoadOption::mysql { .. } => {
|
||||
Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?)
|
||||
}
|
||||
TableLoadOption::sqlite { .. } => {
|
||||
Arc::new(database::DatabaseLoader::SQLite.to_mem_table(t)?)
|
||||
}
|
||||
TableLoadOption::postgres { .. } => {
|
||||
Arc::new(database::DatabaseLoader::Postgres.to_mem_table(t)?)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
let t: Arc<dyn TableProvider> = match t.extension()? {
|
||||
@ -538,11 +576,13 @@ impl KeyValueSource {
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_option(mut self, option: impl Into<TableLoadOption>) -> Self {
|
||||
self.option = Some(option.into());
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_schema(mut self, schema: impl Into<TableSchema>) -> Self {
|
||||
self.schema = Some(schema.into());
|
||||
self
|
||||
@ -619,4 +659,15 @@ batch_size: 512
|
||||
t
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "database")]
|
||||
#[tokio::test]
|
||||
async fn test_load_sqlite_table() -> anyhow::Result<()> {
|
||||
let t = TableSource::new("uk_cities", "sqlite://../test_data/sqlite.db");
|
||||
let table = load(&t).await?;
|
||||
let stats = table.scan(&None, &[], None).await?.statistics();
|
||||
assert_eq!(stats.num_rows, Some(37));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,9 +7,9 @@ use crate::table::{TableLoadOption, TableOptionParquet, TableSource};
|
||||
use datafusion::arrow;
|
||||
use datafusion::arrow::datatypes::Schema;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
|
||||
use datafusion::datasource::file_format::parquet::ParquetFormat;
|
||||
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
|
||||
use datafusion::datasource::object_store::local::LocalFileSystem;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion::parquet::arrow::{ArrowReader, ParquetFileArrowReader};
|
||||
use datafusion::parquet::file::reader::SerializedFileReader;
|
||||
|
||||
@ -6,7 +6,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::dataframe::DataFrame;
|
||||
use datafusion::datasource::MemTable;
|
||||
use datafusion::execution::context::ExecutionContext;
|
||||
use datafusion::execution::context::SessionContext;
|
||||
|
||||
use crate::table;
|
||||
|
||||
@ -125,17 +125,17 @@ schema:
|
||||
Ok(table::load(&table_source).await?)
|
||||
}
|
||||
|
||||
pub fn register_table_properties(dfctx: &mut ExecutionContext) -> anyhow::Result<()> {
|
||||
pub fn register_table_properties(dfctx: &mut SessionContext) -> anyhow::Result<()> {
|
||||
dfctx.register_table("properties", Arc::new(properties_table()?))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn register_table_ubuntu_ami(dfctx: &mut ExecutionContext) -> anyhow::Result<()> {
|
||||
pub async fn register_table_ubuntu_ami(dfctx: &mut SessionContext) -> anyhow::Result<()> {
|
||||
dfctx.register_table("ubuntu_ami", ubuntu_ami_table().await?)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn assert_eq_df(df1: Arc<dyn DataFrame>, df2: Arc<dyn DataFrame>) {
|
||||
pub fn assert_eq_df(df1: Arc<DataFrame>, df2: Arc<DataFrame>) {
|
||||
assert_eq!(
|
||||
format!("{:?}", df1.to_logical_plan()),
|
||||
format!("{:?}", df2.to_logical_plan())
|
||||
|
||||
@ -46,6 +46,7 @@ native-tls-vendored = ["columnq/native-tls-vendored"]
|
||||
native-tls = ["columnq/native-tls"]
|
||||
simd = ["columnq/simd"]
|
||||
snmalloc = ["snmalloc-rs"]
|
||||
database = ["columnq/database"]
|
||||
|
||||
[dev-dependencies]
|
||||
reqwest = { version = "0.11", default-features = false, features = [
|
||||
|
||||
BIN
test_data/sqlite.db
Normal file
BIN
test_data/sqlite.db
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user