enable continuous background refresh for delta tables (#352)
Some checks failed
build / build (push) Has been cancelled
build / database_test (push) Has been cancelled
build / object_store_memory_test (push) Has been cancelled
build / object_store_direct_test (push) Has been cancelled
build / openssl_build (push) Has been cancelled
build / mac_cross_build (push) Has been cancelled
build / Docker Image Build (push) Has been cancelled
columnq-cli release / Validate git tag (push) Has been cancelled
roapi release / Validate git tag (push) Has been cancelled
columnq-cli release / macos (push) Has been cancelled
columnq-cli release / windows (map[features:database-sqlite python-architecture:x64 target:x86_64-pc-windows-msvc]) (push) Has been cancelled
columnq-cli release / linux (map[features:rustls,database-sqlite image_tag:aarch64-musl manylinux:2014 name_suffix: rustflags: target:aarch64-unknown-linux-musl upload:true]) (push) Has been cancelled
columnq-cli release / linux (map[features:rustls,database-sqlite image_tag:x86_64-musl manylinux:2010 name_suffix: rustflags:-C target-cpu=skylake target:x86_64-unknown-linux-musl upload:true]) (push) Has been cancelled
columnq-cli release / PyPI Release (push) Has been cancelled
roapi release / macos (push) Has been cancelled
roapi release / windows (map[features:database-sqlite python-architecture:x64 target:x86_64-pc-windows-msvc]) (push) Has been cancelled
roapi release / linux (map[features:rustls,database-sqlite image_tag:aarch64-musl manylinux:2014 name_suffix: rustflags: target:aarch64-unknown-linux-musl upload:true]) (push) Has been cancelled
roapi release / linux (map[features:rustls,database-sqlite image_tag:x86_64-musl manylinux:2010 name_suffix: rustflags:-C target-cpu=skylake target:x86_64-unknown-linux-musl upload:true]) (push) Has been cancelled
roapi release / PyPI Release (push) Has been cancelled
roapi release / Docker Image Release (push) Has been cancelled

This commit is contained in:
QP Hou 2024-12-02 06:49:16 -08:00 committed by GitHub
parent 85c55c9951
commit c1e26a84d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 324 additions and 103 deletions

4
Cargo.lock generated
View File

@ -1407,7 +1407,7 @@ checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "columnq"
version = "0.9.1"
version = "0.9.2"
dependencies = [
"arrow-schema",
"bytes",
@ -4724,7 +4724,7 @@ dependencies = [
[[package]]
name = "roapi"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arrow-cast",
"arrow-flight",

View File

@ -88,7 +88,7 @@ async fn console_loop(cq: &ColumnQ) -> anyhow::Result<()> {
async fn cmd_console(args: &clap::ArgMatches) -> anyhow::Result<()> {
let config = SessionConfig::default().with_information_schema(true);
let mut cq = ColumnQ::new_with_config(config);
let mut cq = ColumnQ::new_with_config(config, true);
if let Some(tables) = args.get_many::<&str>("table") {
for v in tables {
@ -108,7 +108,7 @@ fn bytes_to_stdout(bytes: &[u8]) -> anyhow::Result<()> {
async fn cmd_sql(args: &clap::ArgMatches) -> anyhow::Result<()> {
let config = SessionConfig::default().with_information_schema(true);
let mut cq = ColumnQ::new_with_config(config);
let mut cq = ColumnQ::new_with_config(config, true);
if let Some(tables) = args.get_many::<&str>("table") {
for v in tables {

View File

@ -1,6 +1,6 @@
[package]
name = "columnq"
version = "0.9.1"
version = "0.9.2"
homepage = "https://github.com/roapi/roapi"
license = "MIT"
authors = ["QP Hou <dave2008713@gmail.com>"]

View File

@ -6,6 +6,7 @@ use std::sync::Once;
use datafusion::arrow;
use datafusion::arrow::array::as_string_array;
use datafusion::arrow::array::StringArray;
use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::error::Result as DatafusionResult;
pub use datafusion::execution::context::SessionConfig;
@ -17,12 +18,14 @@ use object_store::azure::MicrosoftAzureBuilder;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::DynObjectStore;
use object_store::ObjectStore;
use tokio::sync::mpsc;
use url::Url;
use crate::error::{ColumnQError, QueryError};
use crate::io::BlobStoreType;
use crate::query;
use crate::table::TableIoSource;
use crate::table::TableRefresher;
use crate::table::{self, KeyValueSource, TableSource};
static START: Once = Once::new();
@ -31,6 +34,9 @@ pub struct ColumnQ {
pub dfctx: SessionContext,
schema_map: HashMap<String, arrow::datatypes::SchemaRef>,
kv_catalog: HashMap<String, Arc<HashMap<String, String>>>,
read_only: bool,
refresh_rx: mpsc::Receiver<(String, Arc<dyn TableProvider>)>,
refresh_tx: mpsc::Sender<(String, Arc<dyn TableProvider>)>,
}
impl ColumnQ {
@ -39,10 +45,20 @@ impl ColumnQ {
SessionConfig::from_env()
.expect("Valid environment variables should be set to create SessionConfig")
.with_information_schema(true),
true,
)
}
pub fn new_with_config(config: SessionConfig) -> Self {
pub fn new_with_read_only(read_only: bool) -> Self {
Self::new_with_config(
SessionConfig::from_env()
.expect("Valid environment variables should be set to create SessionConfig")
.with_information_schema(true),
read_only,
)
}
pub fn new_with_config(config: SessionConfig, read_only: bool) -> Self {
START.call_once(|| {
deltalake::aws::register_handlers(None);
deltalake::azure::register_handlers(None);
@ -58,17 +74,71 @@ impl ColumnQ {
false,
);
let rn_config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(rn_config).unwrap();
let runtime_env =
RuntimeEnv::new(rn_config).expect("failed to create datafusion runtime env");
let dfctx = SessionContext::new_with_config_rt(config, Arc::new(runtime_env));
let schema_map = HashMap::<String, arrow::datatypes::SchemaRef>::new();
let (refresh_tx, refresh_rx) = mpsc::channel(1024);
Self {
dfctx,
schema_map,
kv_catalog: HashMap::new(),
refresh_rx,
refresh_tx,
read_only,
}
}
fn register_table(
&mut self,
name: impl Into<String>,
table: Arc<dyn TableProvider>,
) -> Result<(), ColumnQError> {
let name = name.into();
let schema = table.schema();
self.dfctx.deregister_table(name.as_str())?;
self.dfctx.register_table(name.as_str(), table)?;
self.schema_map.insert(name, schema);
Ok(())
}
pub async fn refresh_tables(&mut self) -> Result<(), ColumnQError> {
while let Ok((name, table)) = self.refresh_rx.try_recv() {
log::debug!("refreshing table {name:?}...");
self.register_table(name, table)?;
}
Ok(())
}
fn register_refresher(
&mut self,
name: impl Into<String>,
mut refresher: TableRefresher,
interval: std::time::Duration,
) {
let tx = self.refresh_tx.clone();
let name = name.into();
let _handle = tokio::task::spawn(async move {
loop {
tokio::time::sleep(interval).await;
match refresher().await {
Ok(table) => {
log::debug!("sending newly refreshed table {name:?}");
if tx.send((name.clone(), table)).await.is_err() {
log::info!("receiver dropped, ending refresh loop for table {name:?}");
break;
}
}
Err(e) => {
log::error!("failed to refresh table {name:?}: {e:?}");
}
}
}
});
}
pub async fn load_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> {
match &t.io_source {
TableIoSource::Uri(uri_str) => {
@ -102,10 +172,15 @@ impl ColumnQ {
TableIoSource::Memory(_) => {}
};
let table = table::load(t, &self.dfctx).await?;
self.schema_map.insert(t.name.clone(), table.schema());
self.dfctx.deregister_table(t.name.as_str())?;
self.dfctx.register_table(t.name.as_str(), table)?;
let loaded_table = table::load(t, &self.dfctx).await?;
self.register_table(t.name.to_string(), loaded_table.table)?;
if !self.read_only {
if let (Some(refresher), Some(interval)) = (loaded_table.refresher, t.refresh_interval)
{
self.register_refresher(t.name.to_string(), refresher, interval);
}
}
Ok(())
}
@ -178,7 +253,7 @@ impl ColumnQ {
let kv_entry = self.kv_catalog.entry(kv.name.clone());
let (key, value) = (kv.key.clone(), kv.value.clone());
let table = table::load(&kv.into(), &self.dfctx).await?;
let table = table::load(&kv.into(), &self.dfctx).await?.table;
let schema = table.schema();
let key_schema_idx = schema.index_of(&key)?;
if schema.field(key_schema_idx).data_type() != &DataType::Utf8 {

View File

@ -6,7 +6,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use log::debug;
use snafu::prelude::*;
use crate::table::{self, TableSource};
use crate::table::{self, LoadedTable, TableSource};
#[derive(Debug, Snafu)]
pub enum Error {
@ -71,6 +71,15 @@ pub async fn to_mem_table(
.context(table::CreateMemTableSnafu)
}
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<LoadedTable, table::Error> {
Ok(LoadedTable::new_from_table(Arc::new(
to_mem_table(t, dfctx).await?,
)))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -6,7 +6,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use log::debug;
use snafu::prelude::*;
use crate::table::{self, TableSource};
use crate::table::{self, LoadedTable, TableSource};
#[derive(Debug, Snafu)]
pub enum Error {
@ -71,6 +71,15 @@ pub async fn to_mem_table(
.context(table::CreateMemTableSnafu)
}
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<LoadedTable, table::Error> {
Ok(LoadedTable::new_from_table(Arc::new(
to_mem_table(t, dfctx).await?,
)))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -11,7 +11,7 @@ use log::debug;
use snafu::prelude::*;
use crate::table::{
self, datafusion_get_or_infer_schema, TableLoadOption, TableOptionCsv, TableSource,
self, datafusion_get_or_infer_schema, LoadedTable, TableLoadOption, TableOptionCsv, TableSource,
};
#[derive(Debug, Snafu)]
@ -37,7 +37,7 @@ pub enum Error {
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<Arc<dyn TableProvider>, table::Error> {
) -> Result<LoadedTable, table::Error> {
let opt = t
.option
.clone()
@ -46,7 +46,7 @@ pub async fn to_datafusion_table(
.as_csv()
.expect("Invalid table format option, expect csv");
if opt.use_memory_table {
return to_mem_table(t, dfctx).await;
return Ok(LoadedTable::new_from_table(to_mem_table(t, dfctx).await?));
}
let table_url =
ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu {
@ -69,9 +69,9 @@ pub async fn to_datafusion_table(
let table_config = ListingTableConfig::new(table_url)
.with_listing_options(options)
.with_schema(schemaref);
Ok(Arc::new(
Ok(LoadedTable::new_from_table(Arc::new(
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
))
)))
}
pub async fn to_mem_table(

View File

@ -1,14 +1,15 @@
use snafu::prelude::*;
use std::io::Read;
use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderOptions;
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use snafu::prelude::*;
use std::io::Read;
use std::sync::Arc;
use crate::io::{self, BlobStoreType};
use crate::table::{self, TableLoadOption, TableOptionDelta, TableSource};
use crate::table::{self, LoadedTable, TableLoadOption, TableOptionDelta, TableSource};
#[derive(Debug, Snafu)]
pub enum Error {
@ -46,12 +47,28 @@ pub enum Error {
LoadTable {
source: deltalake::errors::DeltaTableError,
},
#[snafu(display("Failed to update table: {source}"))]
UpdateTable {
source: deltalake::errors::DeltaTableError,
},
}
async fn update_table(
mut t: deltalake::DeltaTable,
) -> Result<Arc<dyn TableProvider>, table::Error> {
t
// TODO: find a way to not do a full table update?
.update()
.await
.context(UpdateTableSnafu)
.context(table::LoadDeltaSnafu)?;
Ok(Arc::new(t) as Arc<dyn TableProvider>)
}
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<Arc<dyn TableProvider>, table::Error> {
) -> Result<LoadedTable, table::Error> {
let opt = t
.option
.clone()
@ -74,13 +91,23 @@ pub async fn to_datafusion_table(
let batch_size = t.batch_size;
if *use_memory_table {
to_mem_table(delta_table, blob_type, batch_size, dfctx).await
Ok(LoadedTable::new_from_table(
to_mem_table(delta_table, blob_type, batch_size, dfctx).await?,
))
} else {
to_delta_table(delta_table, blob_type).await
let curr_table = delta_table.clone();
let df_table = cast_datafusion_table(delta_table, blob_type)?;
Ok(LoadedTable::new(
df_table,
Box::new(move || {
let next_table = curr_table.clone();
Box::pin(update_table(next_table))
}),
))
}
}
pub async fn to_delta_table(
fn cast_datafusion_table(
delta_table: deltalake::DeltaTable,
blob_type: io::BlobStoreType,
) -> Result<Arc<dyn TableProvider>, table::Error> {
@ -208,7 +235,8 @@ mod tests {
&ctx,
)
.await
.unwrap();
.unwrap()
.table;
validate_statistics(
t.scan(&ctx.state(), None, &[], None)
@ -235,7 +263,8 @@ mod tests {
&ctx,
)
.await
.unwrap();
.unwrap()
.table;
match t.as_any().downcast_ref::<DeltaTable>() {
Some(delta_table) => {

View File

@ -1,4 +1,3 @@
use crate::table::{self, TableOptionExcel, TableSchema, TableSource};
use arrow_schema::TimeUnit;
use calamine::{open_workbook_auto, DataType as ExcelDataType, Range, Reader, Sheets};
use datafusion::arrow::array::{
@ -14,6 +13,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::vec;
use crate::table::{self, LoadedTable, TableOptionExcel, TableSchema, TableSource};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to load Excel: {msg}"))]
@ -380,6 +381,12 @@ pub async fn to_mem_table(
}
}
pub async fn to_datafusion_table(t: &TableSource) -> Result<LoadedTable, table::Error> {
Ok(LoadedTable::new_from_table(Arc::new(
to_mem_table(t).await?,
)))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -9,7 +9,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use serde_json::value::Value;
use snafu::prelude::*;
use crate::table::{self, TableLoadOption, TableSchema, TableSource};
use crate::table::{self, LoadedTable, TableLoadOption, TableSchema, TableSource};
#[derive(Debug, Snafu)]
pub enum Error {
@ -213,6 +213,15 @@ pub async fn to_mem_table(
.context(table::CreateMemTableSnafu)
}
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<LoadedTable, table::Error> {
Ok(LoadedTable::new_from_table(Arc::new(
to_mem_table(t, dfctx).await?,
)))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,6 +1,8 @@
use std::ffi::OsStr;
use std::future::Future;
use std::io::Read;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use datafusion::arrow;
@ -430,6 +432,8 @@ pub struct TableSource {
#[serde(default = "TableSource::default_batch_size")]
pub batch_size: usize,
pub partition_columns: Option<Vec<TableColumn>>,
#[serde(default = "TableSource::default_refresh_interval")]
pub refresh_interval: Option<std::time::Duration>,
}
impl From<KeyValueSource> for TableSource {
@ -442,6 +446,7 @@ impl From<KeyValueSource> for TableSource {
option: kv.option,
batch_size: Self::default_batch_size(),
partition_columns: None,
refresh_interval: None,
}
}
}
@ -458,6 +463,7 @@ impl TableSource {
option,
batch_size: Self::default_batch_size(),
partition_columns: None,
refresh_interval: Self::default_refresh_interval(),
}
}
@ -478,6 +484,11 @@ impl TableSource {
8192
}
#[inline]
pub fn default_refresh_interval() -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(60))
}
#[inline]
#[must_use]
pub fn with_option(mut self, option: impl Into<TableLoadOption>) -> Self {
@ -504,6 +515,11 @@ impl TableSource {
self
}
pub fn with_refresh_interval(mut self, interval: std::time::Duration) -> Self {
self.refresh_interval = Some(interval);
self
}
pub fn get_uri_str(&self) -> &str {
match &self.io_source {
TableIoSource::Uri(uri) => uri.as_str(),
@ -634,65 +650,98 @@ pub async fn datafusion_get_or_infer_schema(
})
}
// pub type TableRefresher = Arc<
// tokio::sync::Mutex<
// dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Arc<dyn TableProvider>, Error>> + Send>>
// + Send,
// >,
// >;
//
pub type TableRefresher = Box<
dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Arc<dyn TableProvider>, Error>> + Send>>
+ Send,
>;
pub struct LoadedTable {
pub table: Arc<dyn TableProvider>,
pub refresher: Option<TableRefresher>,
}
impl LoadedTable {
pub fn new(table: Arc<dyn TableProvider>, refresher: TableRefresher) -> Self {
Self {
table,
refresher: Some(refresher),
}
}
pub fn new_from_table(table: Arc<dyn TableProvider>) -> Self {
Self {
table,
refresher: None,
}
}
}
pub async fn load(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<Arc<dyn TableProvider>, Error> {
) -> Result<LoadedTable, Error> {
if let Some(opt) = &t.option {
Ok(match opt {
TableLoadOption::json { .. } => Arc::new(json::to_mem_table(t, dfctx).await?),
TableLoadOption::json { .. } => json::to_datafusion_table(t, dfctx).await?,
TableLoadOption::ndjson { .. } | TableLoadOption::jsonl { .. } => {
Arc::new(ndjson::to_mem_table(t, dfctx).await?)
ndjson::to_datafusion_table(t, dfctx).await?
}
TableLoadOption::csv { .. } => csv::to_datafusion_table(t, dfctx).await?,
TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t, dfctx).await?,
TableLoadOption::google_spreadsheet(_) => {
Arc::new(google_spreadsheets::to_mem_table(t).await?)
LoadedTable::new_from_table(Arc::new(google_spreadsheets::to_mem_table(t).await?))
}
TableLoadOption::xlsx { .. }
| TableLoadOption::xls { .. }
| TableLoadOption::xlsb { .. }
| TableLoadOption::ods { .. } => Arc::new(excel::to_mem_table(t).await?),
| TableLoadOption::ods { .. } => excel::to_datafusion_table(t).await?,
TableLoadOption::delta { .. } => delta::to_datafusion_table(t, dfctx).await?,
TableLoadOption::arrow { .. } => {
Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?)
}
TableLoadOption::arrow { .. } => arrow_ipc_file::to_datafusion_table(t, dfctx).await?,
TableLoadOption::arrows { .. } => {
Arc::new(arrow_ipc_stream::to_mem_table(t, dfctx).await?)
}
TableLoadOption::mysql { .. } => {
Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?)
}
TableLoadOption::sqlite { .. } => {
Arc::new(database::DatabaseLoader::SQLite.to_mem_table(t)?)
}
TableLoadOption::postgres { .. } => {
Arc::new(database::DatabaseLoader::Postgres.to_mem_table(t)?)
arrow_ipc_stream::to_datafusion_table(t, dfctx).await?
}
TableLoadOption::mysql { .. } => LoadedTable::new_from_table(Arc::new(
database::DatabaseLoader::MySQL.to_mem_table(t)?,
)),
TableLoadOption::sqlite { .. } => LoadedTable::new_from_table(Arc::new(
database::DatabaseLoader::SQLite.to_mem_table(t)?,
)),
TableLoadOption::postgres { .. } => LoadedTable::new_from_table(Arc::new(
database::DatabaseLoader::Postgres.to_mem_table(t)?,
)),
})
} else {
let t: Arc<dyn TableProvider> = match t.extension()? {
"csv" => csv::to_datafusion_table(t, dfctx).await?,
"json" => Arc::new(json::to_mem_table(t, dfctx).await?),
"ndjson" | "jsonl" => Arc::new(ndjson::to_mem_table(t, dfctx).await?),
"parquet" => parquet::to_datafusion_table(t, dfctx).await?,
"xls" | "xlsx" | "xlsb" | "ods" => Arc::new(excel::to_mem_table(t).await?),
"arrow" => Arc::new(arrow_ipc_file::to_mem_table(t, dfctx).await?),
"arrows" => Arc::new(arrow_ipc_stream::to_mem_table(t, dfctx).await?),
"mysql" => Arc::new(database::DatabaseLoader::MySQL.to_mem_table(t)?),
"sqlite" => Arc::new(database::DatabaseLoader::SQLite.to_mem_table(t)?),
"postgresql" => Arc::new(database::DatabaseLoader::Postgres.to_mem_table(t)?),
ext => {
return Err(Error::InvalidUri {
msg: format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
t.io_source, t.name, ext,
),
});
}
};
Ok(t)
match t.extension()? {
"csv" => csv::to_datafusion_table(t, dfctx).await,
"json" => json::to_datafusion_table(t, dfctx).await,
"ndjson" | "jsonl" => ndjson::to_datafusion_table(t, dfctx).await,
"parquet" => parquet::to_datafusion_table(t, dfctx).await,
"xls" | "xlsx" | "xlsb" | "ods" => excel::to_datafusion_table(t).await,
"arrow" => arrow_ipc_file::to_datafusion_table(t, dfctx).await,
"arrows" => arrow_ipc_stream::to_datafusion_table(t, dfctx).await,
"mysql" => Ok(LoadedTable::new_from_table(Arc::new(
database::DatabaseLoader::MySQL.to_mem_table(t)?,
))),
"sqlite" => Ok(LoadedTable::new_from_table(Arc::new(
database::DatabaseLoader::SQLite.to_mem_table(t)?,
))),
"postgresql" => Ok(LoadedTable::new_from_table(Arc::new(
database::DatabaseLoader::Postgres.to_mem_table(t)?,
))),
ext => Err(Error::InvalidUri {
msg: format!(
"failed to register `{}` as table `{}`, unsupported table format `{}`",
t.io_source, t.name, ext,
),
}),
}
}
}
@ -903,8 +952,9 @@ schema:
let t = TableSource::new("uk_cities", "sqlite://../test_data/sqlite/sample.db");
let ctx = datafusion::prelude::SessionContext::new();
let table = load(&t, &ctx).await.unwrap();
let stats = table
let t = load(&t, &ctx).await.unwrap();
let stats = t
.table
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
@ -928,8 +978,9 @@ uri: "sqlite://../test_data/sqlite/sample.{}"
))
.unwrap();
let ctx = datafusion::prelude::SessionContext::new();
let table = load(&t, &ctx).await.unwrap();
let stats = table
let t = load(&t, &ctx).await.unwrap();
let stats = t
.table
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()

View File

@ -7,7 +7,7 @@ use datafusion::arrow::json::reader::{infer_json_schema, ReaderBuilder};
use datafusion::arrow::record_batch::RecordBatch;
use snafu::prelude::*;
use crate::table::{self, TableSource};
use crate::table::{self, LoadedTable, TableSource};
#[derive(Debug, Snafu)]
pub enum Error {
@ -89,6 +89,15 @@ pub async fn to_mem_table(
.context(table::CreateMemTableSnafu)
}
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<LoadedTable, table::Error> {
Ok(LoadedTable::new_from_table(Arc::new(
to_mem_table(t, dfctx).await?,
)))
}
#[cfg(test)]
mod tests {
use datafusion::{datasource::TableProvider, prelude::SessionContext};

View File

@ -15,7 +15,8 @@ use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use snafu::prelude::*;
use crate::table::{
self, datafusion_get_or_infer_schema, TableLoadOption, TableOptionParquet, TableSource,
self, datafusion_get_or_infer_schema, LoadedTable, TableLoadOption, TableOptionParquet,
TableSource,
};
#[derive(Debug, Snafu)]
@ -47,7 +48,7 @@ pub enum Error {
pub async fn to_datafusion_table(
t: &TableSource,
dfctx: &datafusion::execution::context::SessionContext,
) -> Result<Arc<dyn TableProvider>, table::Error> {
) -> Result<LoadedTable, table::Error> {
let opt = t
.option
.clone()
@ -55,7 +56,7 @@ pub async fn to_datafusion_table(
let TableOptionParquet { use_memory_table } = opt.as_parquet()?;
if *use_memory_table {
to_mem_table(t, dfctx).await
Ok(LoadedTable::new_from_table(to_mem_table(t, dfctx).await?))
} else {
let table_url = ListingTableUrl::parse(t.get_uri_str())
.context(ParseUriSnafu)
@ -77,9 +78,9 @@ pub async fn to_datafusion_table(
let table_config = ListingTableConfig::new(table_url)
.with_listing_options(options)
.with_schema(schemaref);
Ok(Arc::new(
Ok(LoadedTable::new_from_table(Arc::new(
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
))
)))
}
}
@ -178,6 +179,7 @@ mod tests {
.unwrap();
let stats = t
.table
.scan(&ctx.state(), None, &[], None)
.await
.unwrap()
@ -189,7 +191,7 @@ mod tests {
assert_eq!(stats[1].null_count, Precision::Exact(373));
assert_eq!(stats[2].null_count, Precision::Exact(237));
match t.as_any().downcast_ref::<ListingTable>() {
match t.table.as_any().downcast_ref::<ListingTable>() {
Some(_) => {}
None => panic!("must be of type datafusion::datasource::listing::ListingTable"),
}

View File

@ -126,8 +126,8 @@ schema:
// patch uri path with the correct test data path
table_source.io_source = table::TableIoSource::Uri(test_data_path("ubuntu-ami.json"));
let ctx = SessionContext::new();
let t = table::load(&table_source, &ctx).await;
Ok(whatever!(t, "failed to load table"))
let t = table::load(&table_source, &ctx).await.unwrap();
Ok(t.table)
}
pub fn register_table_properties(dfctx: &mut SessionContext) {

View File

@ -52,5 +52,8 @@ async fn infer_csv_schema_by_selected_files() {
builder.push(Field::new("ts", DataType::Int64, true));
builder.push(Field::new("value", DataType::Float64, true));
assert_eq!(t.schema(), Arc::new(Schema::new(builder.finish().fields)));
assert_eq!(
t.table.schema(),
Arc::new(Schema::new(builder.finish().fields))
);
}

View File

@ -1,6 +1,6 @@
[package]
name = "roapi"
version = "0.12.1"
version = "0.12.2"
authors = ["QP Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/roapi/roapi"
license = "MIT"

View File

@ -99,7 +99,7 @@ fn address_flight_sql_arg() -> clap::Arg {
fn read_only_arg() -> clap::Arg {
clap::Arg::new("disable-read-only")
.help("Start roapi in read write mode")
.help("Start roapi in read write mode, allowing tables to be updated at runtime")
.required(false)
.num_args(0)
.long("disable-read-only")

View File

@ -25,10 +25,10 @@ pub struct RawRoapiContext {
}
impl RawRoapiContext {
pub async fn new(config: &Config) -> Result<Self, Whatever> {
pub async fn new(config: &Config, read_only: bool) -> Result<Self, Whatever> {
let mut cq = match config.get_datafusion_config() {
Ok(df_cfg) => ColumnQ::new_with_config(df_cfg),
_ => ColumnQ::new(),
Ok(df_cfg) => ColumnQ::new_with_config(df_cfg, read_only),
_ => ColumnQ::new_with_read_only(read_only),
};
if config.tables.is_empty() && config.kvstores.is_empty() {
@ -94,6 +94,8 @@ pub trait RoapiContext: Send + Sync + 'static {
async fn get_response_format(&self) -> encoding::ContentType;
async fn get_dfctx(&self) -> SessionContext;
async fn refresh_tables(&self) -> Result<(), ColumnQError>;
}
#[async_trait]
@ -203,6 +205,13 @@ impl RoapiContext for RawRoapiContext {
async fn get_dfctx(&self) -> SessionContext {
self.cq.dfctx.clone()
}
#[inline]
async fn refresh_tables(&self) -> Result<(), ColumnQError> {
Err(ColumnQError::Generic(
"Table refresh not supported in read only mode".to_string(),
))
}
}
#[async_trait]
@ -322,4 +331,10 @@ impl RoapiContext for ConcurrentRoapiContext {
let ctx = self.read().await;
ctx.cq.dfctx.clone()
}
#[inline]
async fn refresh_tables(&self) -> Result<(), ColumnQError> {
let mut ctx = self.write().await;
ctx.cq.refresh_tables().await
}
}

View File

@ -21,6 +21,7 @@ pub enum Error {
BuildFlightSqlServer { source: server::flight_sql::Error },
}
// TODO: replace table reloader with the new concurrent refresh infra
pub struct TableReloader {
reload_interval: Duration,
ctx_ext: Arc<RwLock<RawRoapiContext>>,
@ -58,7 +59,7 @@ impl Application {
pub async fn build(config: Config) -> Result<Self, Error> {
let default_host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let handler_ctx = RawRoapiContext::new(&config)
let handler_ctx = RawRoapiContext::new(&config, !config.disable_read_only)
.await
.expect("Failed to create Roapi context");
@ -70,7 +71,7 @@ impl Application {
let tables = Arc::new(Mutex::new(tables));
if config.disable_read_only {
let ctx_ext = Arc::new(RwLock::new(handler_ctx));
let ctx_ext: Arc<ConcurrentRoapiContext> = Arc::new(RwLock::new(handler_ctx));
let postgres_server = Box::new(
server::postgres::PostgresServer::new(
ctx_ext.clone(),
@ -97,14 +98,18 @@ impl Application {
);
let (http_server, http_addr) =
server::http::build_http_server::<ConcurrentRoapiContext>(
ctx_ext,
tables,
&config,
default_host,
)
.await
.context(BuildHttpServerSnafu)?;
server::http::build_http_server(ctx_ext.clone(), tables, &config, default_host)
.await
.context(BuildHttpServerSnafu)?;
let _handle = tokio::task::spawn(async move {
loop {
if let Err(e) = ctx_ext.refresh_tables().await {
error!("Failed to refresh table: {:?}", e);
}
time::sleep(Duration::from_millis(1000)).await;
}
});
Ok(Self {
http_addr,
@ -164,8 +169,6 @@ impl Application {
}
pub async fn run_until_stopped(self) -> Result<(), Error> {
// FIXME: exit from tokio spawn
let postgres_server = self.postgres_server;
info!(
"🚀 Listening on {} for Postgres traffic...",