From c1e26a84d1cd23f0776655707c3fb0936a0b5a26 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Mon, 2 Dec 2024 06:49:16 -0800 Subject: [PATCH] enable continuous background refresh for delta tables (#352) --- Cargo.lock | 4 +- columnq-cli/src/main.rs | 4 +- columnq/Cargo.toml | 2 +- columnq/src/columnq.rs | 91 +++++++++++++++-- columnq/src/table/arrow_ipc_file.rs | 11 +- columnq/src/table/arrow_ipc_stream.rs | 11 +- columnq/src/table/csv.rs | 10 +- columnq/src/table/delta.rs | 49 +++++++-- columnq/src/table/excel.rs | 9 +- columnq/src/table/json.rs | 11 +- columnq/src/table/mod.rs | 139 ++++++++++++++++++-------- columnq/src/table/ndjson.rs | 11 +- columnq/src/table/parquet.rs | 14 +-- columnq/src/test_util.rs | 4 +- columnq/tests/table_csv_test.rs | 5 +- roapi/Cargo.toml | 2 +- roapi/src/config.rs | 2 +- roapi/src/context.rs | 21 +++- roapi/src/startup.rs | 27 ++--- 19 files changed, 324 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33ea8f9..617b8ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1407,7 +1407,7 @@ checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" [[package]] name = "columnq" -version = "0.9.1" +version = "0.9.2" dependencies = [ "arrow-schema", "bytes", @@ -4724,7 +4724,7 @@ dependencies = [ [[package]] name = "roapi" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arrow-cast", "arrow-flight", diff --git a/columnq-cli/src/main.rs b/columnq-cli/src/main.rs index 3dd6a0a..c118d93 100644 --- a/columnq-cli/src/main.rs +++ b/columnq-cli/src/main.rs @@ -88,7 +88,7 @@ async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> { async fn cmd_console(args: &clap::ArgMatches) -> anyhow::Result<()> { let config = SessionConfig::default().with_information_schema(true); - let mut cq = ColumnQ::new_with_config(config); + let mut cq = ColumnQ::new_with_config(config, true); if let Some(tables) = args.get_many::<&str>("table") { for v in tables { @@ -108,7 +108,7 @@ fn bytes_to_stdout(bytes: &[u8]) -> anyhow::Result<()> { async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> { let config = SessionConfig::default().with_information_schema(true); - let mut cq = ColumnQ::new_with_config(config); + let mut cq = ColumnQ::new_with_config(config, true); if let Some(tables) = args.get_many::<&str>("table") { for v in tables { diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 271f00f..bdbdb55 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "columnq" -version = "0.9.1" +version = "0.9.2" homepage = "https://github.com/roapi/roapi" license = "MIT" authors = ["QP Hou "] diff --git a/columnq/src/columnq.rs b/columnq/src/columnq.rs index e698731..8439bc92 100644 --- a/columnq/src/columnq.rs +++ b/columnq/src/columnq.rs @@ -6,6 +6,7 @@ use std::sync::Once; use datafusion::arrow; use datafusion::arrow::array::as_string_array; use datafusion::arrow::array::StringArray; +use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; use datafusion::error::Result as DatafusionResult; pub use datafusion::execution::context::SessionConfig; @@ -17,12 +18,14 @@ use object_store::azure::MicrosoftAzureBuilder; use object_store::gcp::GoogleCloudStorageBuilder; use object_store::DynObjectStore; use object_store::ObjectStore; +use tokio::sync::mpsc; use url::Url; use crate::error::{ColumnQError, QueryError}; use crate::io::BlobStoreType; use crate::query; use crate::table::TableIoSource; +use crate::table::TableRefresher; use crate::table::{self, KeyValueSource, TableSource}; static START: Once = Once::new(); @@ -31,6 +34,9 @@ pub struct ColumnQ { pub dfctx: SessionContext, schema_map: HashMap, kv_catalog: HashMap>>, + read_only: bool, + refresh_rx: mpsc::Receiver<(String, Arc)>, + refresh_tx: mpsc::Sender<(String, Arc)>, } impl ColumnQ { @@ -39,10 +45,20 @@ impl ColumnQ { SessionConfig::from_env() .expect("Valid environment variables should be set to create SessionConfig") .with_information_schema(true), + true, ) } - pub fn new_with_config(config: SessionConfig) -> Self { + pub fn new_with_read_only(read_only: bool) -> Self { + Self::new_with_config( + SessionConfig::from_env() + .expect("Valid environment variables should be set to create SessionConfig") + .with_information_schema(true), + read_only, + ) + } + + pub fn new_with_config(config: SessionConfig, read_only: bool) -> Self { START.call_once(|| { deltalake::aws::register_handlers(None); deltalake::azure::register_handlers(None); @@ -58,17 +74,71 @@ impl ColumnQ { false, ); let rn_config = RuntimeConfig::new(); - let runtime_env = RuntimeEnv::new(rn_config).unwrap(); + let runtime_env = + RuntimeEnv::new(rn_config).expect("failed to create datafusion runtime env"); let dfctx = SessionContext::new_with_config_rt(config, Arc::new(runtime_env)); - let schema_map = HashMap::::new(); + let (refresh_tx, refresh_rx) = mpsc::channel(1024); + Self { dfctx, schema_map, kv_catalog: HashMap::new(), + refresh_rx, + refresh_tx, + read_only, } } + fn register_table( + &mut self, + name: impl Into, + table: Arc, + ) -> Result<(), ColumnQError> { + let name = name.into(); + let schema = table.schema(); + self.dfctx.deregister_table(name.as_str())?; + self.dfctx.register_table(name.as_str(), table)?; + self.schema_map.insert(name, schema); + + Ok(()) + } + + pub async fn refresh_tables(&mut self) -> Result<(), ColumnQError> { + while let Ok((name, table)) = self.refresh_rx.try_recv() { + log::debug!("refreshing table {name:?}..."); + self.register_table(name, table)?; + } + Ok(()) + } + + fn register_refresher( + &mut self, + name: impl Into, + mut refresher: TableRefresher, + interval: std::time::Duration, + ) { + let tx = self.refresh_tx.clone(); + let name = name.into(); + let _handle = tokio::task::spawn(async move { + loop { + tokio::time::sleep(interval).await; + match refresher().await { + Ok(table) => { + log::debug!("sending newly refreshed table {name:?}"); + if tx.send((name.clone(), table)).await.is_err() { + log::info!("receiver dropped, ending refresh loop for table {name:?}"); + break; + } + } + Err(e) => { + log::error!("failed to refresh table {name:?}: {e:?}"); + } + } + } + }); + } + pub async fn load_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> { match &t.io_source { TableIoSource::Uri(uri_str) => { @@ -102,10 +172,15 @@ impl ColumnQ { TableIoSource::Memory(_) => {} }; - let table = table::load(t, &self.dfctx).await?; - self.schema_map.insert(t.name.clone(), table.schema()); - self.dfctx.deregister_table(t.name.as_str())?; - self.dfctx.register_table(t.name.as_str(), table)?; + let loaded_table = table::load(t, &self.dfctx).await?; + self.register_table(t.name.to_string(), loaded_table.table)?; + + if !self.read_only { + if let (Some(refresher), Some(interval)) = (loaded_table.refresher, t.refresh_interval) + { + self.register_refresher(t.name.to_string(), refresher, interval); + } + } Ok(()) } @@ -178,7 +253,7 @@ impl ColumnQ { let kv_entry = self.kv_catalog.entry(kv.name.clone()); let (key, value) = (kv.key.clone(), kv.value.clone()); - let table = table::load(&kv.into(), &self.dfctx).await?; + let table = table::load(&kv.into(), &self.dfctx).await?.table; let schema = table.schema(); let key_schema_idx = schema.index_of(&key)?; if schema.field(key_schema_idx).data_type() != &DataType::Utf8 { diff --git a/columnq/src/table/arrow_ipc_file.rs b/columnq/src/table/arrow_ipc_file.rs index cb7edd4..983d7b1 100644 --- a/columnq/src/table/arrow_ipc_file.rs +++ b/columnq/src/table/arrow_ipc_file.rs @@ -6,7 +6,7 @@ use datafusion::arrow::record_batch::RecordBatch; use log::debug; use snafu::prelude::*; -use crate::table::{self, TableSource}; +use crate::table::{self, LoadedTable, TableSource}; #[derive(Debug, Snafu)] pub enum Error { @@ -71,6 +71,15 @@ pub async fn to_mem_table( .context(table::CreateMemTableSnafu) } +pub async fn to_datafusion_table( + t: &TableSource, + dfctx: &datafusion::execution::context::SessionContext, +) -> Result { + Ok(LoadedTable::new_from_table(Arc::new( + to_mem_table(t, dfctx).await?, + ))) +} + #[cfg(test)] mod tests { use super::*; diff --git a/columnq/src/table/arrow_ipc_stream.rs b/columnq/src/table/arrow_ipc_stream.rs index b01e915..010511e 100644 --- a/columnq/src/table/arrow_ipc_stream.rs +++ b/columnq/src/table/arrow_ipc_stream.rs @@ -6,7 +6,7 @@ use datafusion::arrow::record_batch::RecordBatch; use log::debug; use snafu::prelude::*; -use crate::table::{self, TableSource}; +use crate::table::{self, LoadedTable, TableSource}; #[derive(Debug, Snafu)] pub enum Error { @@ -71,6 +71,15 @@ pub async fn to_mem_table( .context(table::CreateMemTableSnafu) } +pub async fn to_datafusion_table( + t: &TableSource, + dfctx: &datafusion::execution::context::SessionContext, +) -> Result { + Ok(LoadedTable::new_from_table(Arc::new( + to_mem_table(t, dfctx).await?, + ))) +} + #[cfg(test)] mod tests { use super::*; diff --git a/columnq/src/table/csv.rs b/columnq/src/table/csv.rs index 1e40440..e9378d8 100644 --- a/columnq/src/table/csv.rs +++ b/columnq/src/table/csv.rs @@ -11,7 +11,7 @@ use log::debug; use snafu::prelude::*; use crate::table::{ - self, datafusion_get_or_infer_schema, TableLoadOption, TableOptionCsv, TableSource, + self, datafusion_get_or_infer_schema, LoadedTable, TableLoadOption, TableOptionCsv, TableSource, }; #[derive(Debug, Snafu)] @@ -37,7 +37,7 @@ pub enum Error { pub async fn to_datafusion_table( t: &TableSource, dfctx: &datafusion::execution::context::SessionContext, -) -> Result, table::Error> { +) -> Result { let opt = t .option .clone() @@ -46,7 +46,7 @@ pub async fn to_datafusion_table( .as_csv() .expect("Invalid table format option, expect csv"); if opt.use_memory_table { - return to_mem_table(t, dfctx).await; + return Ok(LoadedTable::new_from_table(to_mem_table(t, dfctx).await?)); } let table_url = ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu { @@ -69,9 +69,9 @@ pub async fn to_datafusion_table( let table_config = ListingTableConfig::new(table_url) .with_listing_options(options) .with_schema(schemaref); - Ok(Arc::new( + Ok(LoadedTable::new_from_table(Arc::new( ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?, - )) + ))) } pub async fn to_mem_table( diff --git a/columnq/src/table/delta.rs b/columnq/src/table/delta.rs index 0fa6e1f..1a7b744 100644 --- a/columnq/src/table/delta.rs +++ b/columnq/src/table/delta.rs @@ -1,14 +1,15 @@ +use snafu::prelude::*; +use std::io::Read; +use std::sync::Arc; + use datafusion::arrow; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; use datafusion::parquet::arrow::arrow_reader::ArrowReaderOptions; use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use snafu::prelude::*; -use std::io::Read; -use std::sync::Arc; use crate::io::{self, BlobStoreType}; -use crate::table::{self, TableLoadOption, TableOptionDelta, TableSource}; +use crate::table::{self, LoadedTable, TableLoadOption, TableOptionDelta, TableSource}; #[derive(Debug, Snafu)] pub enum Error { @@ -46,12 +47,28 @@ pub enum Error { LoadTable { source: deltalake::errors::DeltaTableError, }, + #[snafu(display("Failed to update table: {source}"))] + UpdateTable { + source: deltalake::errors::DeltaTableError, + }, +} + +async fn update_table( + mut t: deltalake::DeltaTable, +) -> Result, table::Error> { + t + // TODO: find a way to not do a full table update? + .update() + .await + .context(UpdateTableSnafu) + .context(table::LoadDeltaSnafu)?; + Ok(Arc::new(t) as Arc) } pub async fn to_datafusion_table( t: &TableSource, dfctx: &datafusion::execution::context::SessionContext, -) -> Result, table::Error> { +) -> Result { let opt = t .option .clone() @@ -74,13 +91,23 @@ pub async fn to_datafusion_table( let batch_size = t.batch_size; if *use_memory_table { - to_mem_table(delta_table, blob_type, batch_size, dfctx).await + Ok(LoadedTable::new_from_table( + to_mem_table(delta_table, blob_type, batch_size, dfctx).await?, + )) } else { - to_delta_table(delta_table, blob_type).await + let curr_table = delta_table.clone(); + let df_table = cast_datafusion_table(delta_table, blob_type)?; + Ok(LoadedTable::new( + df_table, + Box::new(move || { + let next_table = curr_table.clone(); + Box::pin(update_table(next_table)) + }), + )) } } -pub async fn to_delta_table( +fn cast_datafusion_table( delta_table: deltalake::DeltaTable, blob_type: io::BlobStoreType, ) -> Result, table::Error> { @@ -208,7 +235,8 @@ mod tests { &ctx, ) .await - .unwrap(); + .unwrap() + .table; validate_statistics( t.scan(&ctx.state(), None, &[], None) @@ -235,7 +263,8 @@ mod tests { &ctx, ) .await - .unwrap(); + .unwrap() + .table; match t.as_any().downcast_ref::() { Some(delta_table) => { diff --git a/columnq/src/table/excel.rs b/columnq/src/table/excel.rs index 55b8d62..7cd2dd1 100644 --- a/columnq/src/table/excel.rs +++ b/columnq/src/table/excel.rs @@ -1,4 +1,3 @@ -use crate::table::{self, TableOptionExcel, TableSchema, TableSource}; use arrow_schema::TimeUnit; use calamine::{open_workbook_auto, DataType as ExcelDataType, Range, Reader, Sheets}; use datafusion::arrow::array::{ @@ -14,6 +13,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::vec; +use crate::table::{self, LoadedTable, TableOptionExcel, TableSchema, TableSource}; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to load Excel: {msg}"))] @@ -380,6 +381,12 @@ pub async fn to_mem_table( } } +pub async fn to_datafusion_table(t: &TableSource) -> Result { + Ok(LoadedTable::new_from_table(Arc::new( + to_mem_table(t).await?, + ))) +} + #[cfg(test)] mod tests { use super::*; diff --git a/columnq/src/table/json.rs b/columnq/src/table/json.rs index 0568998..907ff0c 100644 --- a/columnq/src/table/json.rs +++ b/columnq/src/table/json.rs @@ -9,7 +9,7 @@ use datafusion::arrow::record_batch::RecordBatch; use serde_json::value::Value; use snafu::prelude::*; -use crate::table::{self, TableLoadOption, TableSchema, TableSource}; +use crate::table::{self, LoadedTable, TableLoadOption, TableSchema, TableSource}; #[derive(Debug, Snafu)] pub enum Error { @@ -213,6 +213,15 @@ pub async fn to_mem_table( .context(table::CreateMemTableSnafu) } +pub async fn to_datafusion_table( + t: &TableSource, + dfctx: &datafusion::execution::context::SessionContext, +) -> Result { + Ok(LoadedTable::new_from_table(Arc::new( + to_mem_table(t, dfctx).await?, + ))) +} + #[cfg(test)] mod tests { use super::*; diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 051e7a5..ea7a3fe 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -1,6 +1,8 @@ use std::ffi::OsStr; +use std::future::Future; use std::io::Read; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; use datafusion::arrow; @@ -430,6 +432,8 @@ pub struct TableSource { #[serde(default = "TableSource::default_batch_size")] pub batch_size: usize, pub partition_columns: Option>, + #[serde(default = "TableSource::default_refresh_interval")] + pub refresh_interval: Option, } impl From for TableSource { @@ -442,6 +446,7 @@ impl From for TableSource { option: kv.option, batch_size: Self::default_batch_size(), partition_columns: None, + refresh_interval: None, } } } @@ -458,6 +463,7 @@ impl TableSource { option, batch_size: Self::default_batch_size(), partition_columns: None, + refresh_interval: Self::default_refresh_interval(), } } @@ -478,6 +484,11 @@ impl TableSource { 8192 } + #[inline] + pub fn default_refresh_interval() -> Option { + Some(std::time::Duration::from_secs(60)) + } + #[inline] #[must_use] pub fn with_option(mut self, option: impl Into) -> Self { @@ -504,6 +515,11 @@ impl TableSource { self } + pub fn with_refresh_interval(mut self, interval: std::time::Duration) -> Self { + self.refresh_interval = Some(interval); + self + } + pub fn get_uri_str(&self) -> &str { match &self.io_source { TableIoSource::Uri(uri) => uri.as_str(), @@ -634,65 +650,98 @@ pub async fn datafusion_get_or_infer_schema( }) } +// pub type TableRefresher = Arc< +// tokio::sync::Mutex< +// dyn FnMut() -> Pin, Error>> + Send>> +// + Send, +// >, +// >; +// +pub type TableRefresher = Box< + dyn FnMut() -> Pin, Error>> + Send>> + + Send, +>; + +pub struct LoadedTable { + pub table: Arc, + pub refresher: Option, +} + +impl LoadedTable { + pub fn new(table: Arc, refresher: TableRefresher) -> Self { + Self { + table, + refresher: Some(refresher), + } + } + + pub fn new_from_table(table: Arc) -> Self { + Self { + table, + refresher: None, + } + } +} + pub async fn load( t: &TableSource, dfctx: &datafusion::execution::context::SessionContext, -) -> Result, Error> { +) -> Result { if let Some(opt) = &t.option { Ok(match opt { - TableLoadOption::json { .. } => Arc::new(json::to_mem_table(t, dfctx).await?), + TableLoadOption::json { .. } => json::to_datafusion_table(t, dfctx).await?, TableLoadOption::ndjson { .. } | TableLoadOption::jsonl { .. } => { - Arc::new(ndjson::to_mem_table(t, dfctx).await?) + ndjson::to_datafusion_table(t, dfctx).await? } TableLoadOption::csv { .. } => csv::to_datafusion_table(t, dfctx).await?, TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t, dfctx).await?, TableLoadOption::google_spreadsheet(_) => { - Arc::new(google_spreadsheets::to_mem_table(t).await?) + LoadedTable::new_from_table(Arc::new(google_spreadsheets::to_mem_table(t).await?)) } TableLoadOption::xlsx { .. } | TableLoadOption::xls { .. } | TableLoadOption::xlsb { .. } - | TableLoadOption::ods { .. } => Arc::new(excel::to_mem_table(t).await?), + | TableLoadOption::ods { .. } => excel::to_datafusion_table(t).await?, TableLoadOption::delta { .. } => delta::to_datafusion_table(t, dfctx).await?, - TableLoadOption::arrow { .. } => { - Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?) - } + TableLoadOption::arrow { .. } => arrow_ipc_file::to_datafusion_table(t, dfctx).await?, TableLoadOption::arrows { .. } => { - Arc::new(arrow_ipc_stream::to_mem_table(t, dfctx).await?) - } - TableLoadOption::mysql { .. } => { - Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?) - } - TableLoadOption::sqlite { .. } => { - Arc::new(database::DatabaseLoader::SQLite.to_mem_table(t)?) - } - TableLoadOption::postgres { .. } => { - Arc::new(database::DatabaseLoader::Postgres.to_mem_table(t)?) + arrow_ipc_stream::to_datafusion_table(t, dfctx).await? } + TableLoadOption::mysql { .. } => LoadedTable::new_from_table(Arc::new( + database::DatabaseLoader::MySQL.to_mem_table(t)?, + )), + TableLoadOption::sqlite { .. } => LoadedTable::new_from_table(Arc::new( + database::DatabaseLoader::SQLite.to_mem_table(t)?, + )), + TableLoadOption::postgres { .. } => LoadedTable::new_from_table(Arc::new( + database::DatabaseLoader::Postgres.to_mem_table(t)?, + )), }) } else { - let t: Arc = match t.extension()? { - "csv" => csv::to_datafusion_table(t, dfctx).await?, - "json" => Arc::new(json::to_mem_table(t, dfctx).await?), - "ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t, dfctx).await?), - "parquet" => parquet::to_datafusion_table(t, dfctx).await?, - "xls" | "xlsx" | "xlsb" | "ods" => Arc::new(excel::to_mem_table(t).await?), - "arrow" => Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?), - "arrows" => Arc::new(arrow_ipc_stream::to_mem_table(t, dfctx).await?), - "mysql" => Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?), - "sqlite" => Arc::new(database::DatabaseLoader::SQLite.to_mem_table(t)?), - "postgresql" => Arc::new(database::DatabaseLoader::Postgres.to_mem_table(t)?), - ext => { - return Err(Error::InvalidUri { - msg: format!( - "failed to register `{}` as table `{}`, unsupported table format `{}`", - t.io_source, t.name, ext, - ), - }); - } - }; - - Ok(t) + match t.extension()? { + "csv" => csv::to_datafusion_table(t, dfctx).await, + "json" => json::to_datafusion_table(t, dfctx).await, + "ndjson" | "jsonl" => ndjson::to_datafusion_table(t, dfctx).await, + "parquet" => parquet::to_datafusion_table(t, dfctx).await, + "xls" | "xlsx" | "xlsb" | "ods" => excel::to_datafusion_table(t).await, + "arrow" => arrow_ipc_file::to_datafusion_table(t, dfctx).await, + "arrows" => arrow_ipc_stream::to_datafusion_table(t, dfctx).await, + "mysql" => Ok(LoadedTable::new_from_table(Arc::new( + database::DatabaseLoader::MySQL.to_mem_table(t)?, + ))), + "sqlite" => Ok(LoadedTable::new_from_table(Arc::new( + database::DatabaseLoader::SQLite.to_mem_table(t)?, + ))), + "postgresql" => Ok(LoadedTable::new_from_table(Arc::new( + database::DatabaseLoader::Postgres.to_mem_table(t)?, + ))), + ext => Err(Error::InvalidUri { + msg: format!( + "failed to register `{}` as table `{}`, unsupported table format `{}`", + t.io_source, t.name, ext, + ), + }), + } } } @@ -903,8 +952,9 @@ schema: let t = TableSource::new("uk_cities", "sqlite://../test_data/sqlite/sample.db"); let ctx = datafusion::prelude::SessionContext::new(); - let table = load(&t, &ctx).await.unwrap(); - let stats = table + let t = load(&t, &ctx).await.unwrap(); + let stats = t + .table .scan(&ctx.state(), None, &[], None) .await .unwrap() @@ -928,8 +978,9 @@ uri: "sqlite://../test_data/sqlite/sample.{}" )) .unwrap(); let ctx = datafusion::prelude::SessionContext::new(); - let table = load(&t, &ctx).await.unwrap(); - let stats = table + let t = load(&t, &ctx).await.unwrap(); + let stats = t + .table .scan(&ctx.state(), None, &[], None) .await .unwrap() diff --git a/columnq/src/table/ndjson.rs b/columnq/src/table/ndjson.rs index 7886939..ee11c71 100644 --- a/columnq/src/table/ndjson.rs +++ b/columnq/src/table/ndjson.rs @@ -7,7 +7,7 @@ use datafusion::arrow::json::reader::{infer_json_schema, ReaderBuilder}; use datafusion::arrow::record_batch::RecordBatch; use snafu::prelude::*; -use crate::table::{self, TableSource}; +use crate::table::{self, LoadedTable, TableSource}; #[derive(Debug, Snafu)] pub enum Error { @@ -89,6 +89,15 @@ pub async fn to_mem_table( .context(table::CreateMemTableSnafu) } +pub async fn to_datafusion_table( + t: &TableSource, + dfctx: &datafusion::execution::context::SessionContext, +) -> Result { + Ok(LoadedTable::new_from_table(Arc::new( + to_mem_table(t, dfctx).await?, + ))) +} + #[cfg(test)] mod tests { use datafusion::{datasource::TableProvider, prelude::SessionContext}; diff --git a/columnq/src/table/parquet.rs b/columnq/src/table/parquet.rs index 35da714..a646872 100644 --- a/columnq/src/table/parquet.rs +++ b/columnq/src/table/parquet.rs @@ -15,7 +15,8 @@ use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use snafu::prelude::*; use crate::table::{ - self, datafusion_get_or_infer_schema, TableLoadOption, TableOptionParquet, TableSource, + self, datafusion_get_or_infer_schema, LoadedTable, TableLoadOption, TableOptionParquet, + TableSource, }; #[derive(Debug, Snafu)] @@ -47,7 +48,7 @@ pub enum Error { pub async fn to_datafusion_table( t: &TableSource, dfctx: &datafusion::execution::context::SessionContext, -) -> Result, table::Error> { +) -> Result { let opt = t .option .clone() @@ -55,7 +56,7 @@ pub async fn to_datafusion_table( let TableOptionParquet { use_memory_table } = opt.as_parquet()?; if *use_memory_table { - to_mem_table(t, dfctx).await + Ok(LoadedTable::new_from_table(to_mem_table(t, dfctx).await?)) } else { let table_url = ListingTableUrl::parse(t.get_uri_str()) .context(ParseUriSnafu) @@ -77,9 +78,9 @@ pub async fn to_datafusion_table( let table_config = ListingTableConfig::new(table_url) .with_listing_options(options) .with_schema(schemaref); - Ok(Arc::new( + Ok(LoadedTable::new_from_table(Arc::new( ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?, - )) + ))) } } @@ -178,6 +179,7 @@ mod tests { .unwrap(); let stats = t + .table .scan(&ctx.state(), None, &[], None) .await .unwrap() @@ -189,7 +191,7 @@ mod tests { assert_eq!(stats[1].null_count, Precision::Exact(373)); assert_eq!(stats[2].null_count, Precision::Exact(237)); - match t.as_any().downcast_ref::() { + match t.table.as_any().downcast_ref::() { Some(_) => {} None => panic!("must be of type datafusion::datasource::listing::ListingTable"), } diff --git a/columnq/src/test_util.rs b/columnq/src/test_util.rs index 3d895a3..fce1004 100644 --- a/columnq/src/test_util.rs +++ b/columnq/src/test_util.rs @@ -126,8 +126,8 @@ schema: // patch uri path with the correct test data path table_source.io_source = table::TableIoSource::Uri(test_data_path("ubuntu-ami.json")); let ctx = SessionContext::new(); - let t = table::load(&table_source, &ctx).await; - Ok(whatever!(t, "failed to load table")) + let t = table::load(&table_source, &ctx).await.unwrap(); + Ok(t.table) } pub fn register_table_properties(dfctx: &mut SessionContext) { diff --git a/columnq/tests/table_csv_test.rs b/columnq/tests/table_csv_test.rs index 34bdf50..3d9dabe 100644 --- a/columnq/tests/table_csv_test.rs +++ b/columnq/tests/table_csv_test.rs @@ -52,5 +52,8 @@ async fn infer_csv_schema_by_selected_files() { builder.push(Field::new("ts", DataType::Int64, true)); builder.push(Field::new("value", DataType::Float64, true)); - assert_eq!(t.schema(), Arc::new(Schema::new(builder.finish().fields))); + assert_eq!( + t.table.schema(), + Arc::new(Schema::new(builder.finish().fields)) + ); } diff --git a/roapi/Cargo.toml b/roapi/Cargo.toml index 0448edc..9f8d974 100644 --- a/roapi/Cargo.toml +++ b/roapi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "roapi" -version = "0.12.1" +version = "0.12.2" authors = ["QP Hou "] homepage = "https://github.com/roapi/roapi" license = "MIT" diff --git a/roapi/src/config.rs b/roapi/src/config.rs index 67fdeeb..02f2ff1 100644 --- a/roapi/src/config.rs +++ b/roapi/src/config.rs @@ -99,7 +99,7 @@ fn address_flight_sql_arg() -> clap::Arg { fn read_only_arg() -> clap::Arg { clap::Arg::new("disable-read-only") - .help("Start roapi in read write mode") + .help("Start roapi in read write mode, allowing tables to be updated at runtime") .required(false) .num_args(0) .long("disable-read-only") diff --git a/roapi/src/context.rs b/roapi/src/context.rs index 79a5dfe..3aaa777 100644 --- a/roapi/src/context.rs +++ b/roapi/src/context.rs @@ -25,10 +25,10 @@ pub struct RawRoapiContext { } impl RawRoapiContext { - pub async fn new(config: &Config) -> Result { + pub async fn new(config: &Config, read_only: bool) -> Result { let mut cq = match config.get_datafusion_config() { - Ok(df_cfg) => ColumnQ::new_with_config(df_cfg), - _ => ColumnQ::new(), + Ok(df_cfg) => ColumnQ::new_with_config(df_cfg, read_only), + _ => ColumnQ::new_with_read_only(read_only), }; if config.tables.is_empty() && config.kvstores.is_empty() { @@ -94,6 +94,8 @@ pub trait RoapiContext: Send + Sync + 'static { async fn get_response_format(&self) -> encoding::ContentType; async fn get_dfctx(&self) -> SessionContext; + + async fn refresh_tables(&self) -> Result<(), ColumnQError>; } #[async_trait] @@ -203,6 +205,13 @@ impl RoapiContext for RawRoapiContext { async fn get_dfctx(&self) -> SessionContext { self.cq.dfctx.clone() } + + #[inline] + async fn refresh_tables(&self) -> Result<(), ColumnQError> { + Err(ColumnQError::Generic( + "Table refresh not supported in read only mode".to_string(), + )) + } } #[async_trait] @@ -322,4 +331,10 @@ impl RoapiContext for ConcurrentRoapiContext { let ctx = self.read().await; ctx.cq.dfctx.clone() } + + #[inline] + async fn refresh_tables(&self) -> Result<(), ColumnQError> { + let mut ctx = self.write().await; + ctx.cq.refresh_tables().await + } } diff --git a/roapi/src/startup.rs b/roapi/src/startup.rs index 5cfa73d..1fa5221 100644 --- a/roapi/src/startup.rs +++ b/roapi/src/startup.rs @@ -21,6 +21,7 @@ pub enum Error { BuildFlightSqlServer { source: server::flight_sql::Error }, } +// TODO: replace table reloader with the new concurrent refresh infra pub struct TableReloader { reload_interval: Duration, ctx_ext: Arc>, @@ -58,7 +59,7 @@ impl Application { pub async fn build(config: Config) -> Result { let default_host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); - let handler_ctx = RawRoapiContext::new(&config) + let handler_ctx = RawRoapiContext::new(&config, !config.disable_read_only) .await .expect("Failed to create Roapi context"); @@ -70,7 +71,7 @@ impl Application { let tables = Arc::new(Mutex::new(tables)); if config.disable_read_only { - let ctx_ext = Arc::new(RwLock::new(handler_ctx)); + let ctx_ext: Arc = Arc::new(RwLock::new(handler_ctx)); let postgres_server = Box::new( server::postgres::PostgresServer::new( ctx_ext.clone(), @@ -97,14 +98,18 @@ impl Application { ); let (http_server, http_addr) = - server::http::build_http_server::( - ctx_ext, - tables, - &config, - default_host, - ) - .await - .context(BuildHttpServerSnafu)?; + server::http::build_http_server(ctx_ext.clone(), tables, &config, default_host) + .await + .context(BuildHttpServerSnafu)?; + + let _handle = tokio::task::spawn(async move { + loop { + if let Err(e) = ctx_ext.refresh_tables().await { + error!("Failed to refresh table: {:?}", e); + } + time::sleep(Duration::from_millis(1000)).await; + } + }); Ok(Self { http_addr, @@ -164,8 +169,6 @@ impl Application { } pub async fn run_until_stopped(self) -> Result<(), Error> { - // FIXME: exit from tokio spawn - let postgres_server = self.postgres_server; info!( "🚀 Listening on {} for Postgres traffic...",