mirror of
https://github.com/roapi/roapi.git
synced 2026-06-05 21:04:02 +08:00
refactor: make table refresher a required field (#360)
other drive by changes: * add nix flake for quick dev env setup
This commit is contained in:
parent
10bdbfa314
commit
49da15e4c0
@ -116,7 +116,7 @@ impl ColumnQ {
|
||||
&mut self,
|
||||
name: impl Into<String>,
|
||||
mut refresher: TableRefresher,
|
||||
interval: std::time::Duration,
|
||||
interval: tokio::time::Duration,
|
||||
) {
|
||||
let tx = self.refresh_tx.clone();
|
||||
let name = name.into();
|
||||
@ -176,9 +176,8 @@ impl ColumnQ {
|
||||
self.register_table(t.name.to_string(), loaded_table.table)?;
|
||||
|
||||
if !self.read_only {
|
||||
if let (Some(refresher), Some(interval)) = (loaded_table.refresher, t.refresh_interval)
|
||||
{
|
||||
self.register_refresher(t.name.to_string(), refresher, interval);
|
||||
if let Some(interval) = t.refresh_interval {
|
||||
self.register_refresher(t.name.to_string(), loaded_table.refresher, interval);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
use datafusion::arrow;
|
||||
use datafusion::arrow::datatypes::Schema;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use log::debug;
|
||||
use snafu::prelude::*;
|
||||
|
||||
@ -71,13 +72,18 @@ pub async fn to_mem_table(
|
||||
.context(table::CreateMemTableSnafu)
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
async fn to_datafusion_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
Ok(Arc::new(to_mem_table(&t, &dfctx).await?))
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
to_mem_table(t, dfctx).await?,
|
||||
)))
|
||||
LoadedTable::new_from_df_table_cb(move || to_datafusion_table(t.clone(), dfctx.clone())).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -85,7 +91,6 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
use datafusion::common::stats::Precision;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use std::fs;
|
||||
use tempfile::Builder;
|
||||
|
||||
@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
use datafusion::arrow;
|
||||
use datafusion::arrow::datatypes::Schema;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use log::debug;
|
||||
use snafu::prelude::*;
|
||||
|
||||
@ -71,13 +72,18 @@ pub async fn to_mem_table(
|
||||
.context(table::CreateMemTableSnafu)
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
async fn to_datafusion_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
Ok(Arc::new(to_mem_table(&t, &dfctx).await?))
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
to_mem_table(t, dfctx).await?,
|
||||
)))
|
||||
LoadedTable::new_from_df_table_cb(move || to_datafusion_table(t.clone(), dfctx.clone())).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -85,7 +91,6 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
use datafusion::common::stats::Precision;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use std::fs;
|
||||
use tempfile::Builder;
|
||||
|
||||
@ -34,10 +34,10 @@ pub enum Error {
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
async fn to_datafusion_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
let opt = t
|
||||
.option
|
||||
.clone()
|
||||
@ -46,7 +46,7 @@ pub async fn to_datafusion_table(
|
||||
.as_csv()
|
||||
.expect("Invalid table format option, expect csv");
|
||||
if opt.use_memory_table {
|
||||
return Ok(LoadedTable::new_from_table(to_mem_table(t, dfctx).await?));
|
||||
return to_mem_table(&t, &dfctx).await;
|
||||
}
|
||||
let table_url =
|
||||
ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu {
|
||||
@ -58,7 +58,7 @@ pub async fn to_datafusion_table(
|
||||
}
|
||||
|
||||
let schemaref = datafusion_get_or_infer_schema(
|
||||
dfctx,
|
||||
&dfctx,
|
||||
&table_url,
|
||||
&options,
|
||||
&t.schema,
|
||||
@ -69,12 +69,12 @@ pub async fn to_datafusion_table(
|
||||
let table_config = ListingTableConfig::new(table_url)
|
||||
.with_listing_options(options)
|
||||
.with_schema(schemaref);
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
Ok(Arc::new(
|
||||
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
|
||||
)))
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn to_mem_table(
|
||||
async fn to_mem_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
@ -156,6 +156,13 @@ pub async fn to_mem_table(
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
LoadedTable::new_from_df_table_cb(move || to_datafusion_table(t.clone(), dfctx.clone())).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@ -65,7 +65,7 @@ async fn update_table(
|
||||
Ok(Arc::new(t) as Arc<dyn TableProvider>)
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
pub async fn to_loaded_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
@ -91,9 +91,16 @@ pub async fn to_datafusion_table(
|
||||
let batch_size = t.batch_size;
|
||||
|
||||
if *use_memory_table {
|
||||
Ok(LoadedTable::new_from_table(
|
||||
to_mem_table(delta_table, blob_type, batch_size, dfctx).await?,
|
||||
))
|
||||
let dfctx = dfctx.clone();
|
||||
let to_datafusion_table = move || {
|
||||
to_mem_table(
|
||||
delta_table.clone(),
|
||||
blob_type.clone(),
|
||||
batch_size,
|
||||
dfctx.clone(),
|
||||
)
|
||||
};
|
||||
LoadedTable::new_from_df_table_cb(to_datafusion_table).await
|
||||
} else {
|
||||
let curr_table = delta_table.clone();
|
||||
let df_table = cast_datafusion_table(delta_table, blob_type)?;
|
||||
@ -151,7 +158,7 @@ pub async fn to_mem_table(
|
||||
delta_table: deltalake::DeltaTable,
|
||||
blob_type: io::BlobStoreType,
|
||||
batch_size: usize,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
let paths = delta_table
|
||||
.get_file_uris()
|
||||
@ -183,7 +190,7 @@ pub async fn to_mem_table(
|
||||
|r| -> Result<Vec<RecordBatch>, table::Error> {
|
||||
read_partition::<std::io::Cursor<Vec<u8>>>(r, batch_size)
|
||||
},
|
||||
dfctx,
|
||||
&dfctx,
|
||||
)
|
||||
.await
|
||||
.context(table::IoSnafu)?
|
||||
@ -226,7 +233,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn load_delta_as_memtable() {
|
||||
let ctx = SessionContext::new();
|
||||
let t = to_datafusion_table(
|
||||
let t = to_loaded_table(
|
||||
&TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option(
|
||||
TableLoadOption::delta(TableOptionDelta {
|
||||
use_memory_table: true,
|
||||
@ -254,7 +261,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn load_delta_as_delta_source() {
|
||||
let ctx = SessionContext::new();
|
||||
let t = to_datafusion_table(
|
||||
let t = to_loaded_table(
|
||||
&TableSource::new("blogs".to_string(), test_data_path("blogs-delta")).with_option(
|
||||
TableLoadOption::delta(TableOptionDelta {
|
||||
use_memory_table: false,
|
||||
|
||||
@ -8,6 +8,7 @@ use datafusion::arrow::datatypes::{
|
||||
DataType, Date32Type, Date64Type, Field, Float64Type, Int64Type, Schema,
|
||||
};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use snafu::prelude::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
@ -381,10 +382,15 @@ pub async fn to_mem_table(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(t: &TableSource) -> Result<LoadedTable, table::Error> {
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
to_mem_table(t).await?,
|
||||
)))
|
||||
async fn to_datafusion_table(t: TableSource) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
Ok(Arc::new(to_mem_table(&t).await?))
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(t: TableSource) -> Result<LoadedTable, table::Error> {
|
||||
let reloader = Box::new(move || {
|
||||
Box::pin(to_datafusion_table(t.clone())) as crate::table::TableRefresherOutput
|
||||
});
|
||||
Ok(LoadedTable::new(reloader().await?, reloader))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -5,13 +5,14 @@ use datafusion::arrow::array::{ArrayRef, BooleanArray, PrimitiveArray, StringArr
|
||||
use datafusion::arrow::datatypes::{DataType, Field, Schema};
|
||||
use datafusion::arrow::datatypes::{Float64Type, Int64Type};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use regex::Regex;
|
||||
use reqwest::Client;
|
||||
use serde_derive::Deserialize;
|
||||
use snafu::prelude::*;
|
||||
use uriparse::URIReference;
|
||||
|
||||
use crate::table::{self, TableOptionGoogleSpreadsheet, TableSource};
|
||||
use crate::table::{self, LoadedTable, TableOptionGoogleSpreadsheet, TableSource};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
@ -308,9 +309,13 @@ async fn resolve_sheet_title<'a, 'b, 'c, 'd>(
|
||||
Ok(sheet.properties.title.clone())
|
||||
}
|
||||
|
||||
pub async fn to_mem_table(
|
||||
t: &TableSource,
|
||||
) -> Result<datafusion::datasource::MemTable, table::Error> {
|
||||
#[derive(Clone)]
|
||||
struct GetReqContext {
|
||||
token: yup_oauth2::AccessToken,
|
||||
url: String,
|
||||
}
|
||||
|
||||
async fn gs_get_req_contex(t: &TableSource) -> Result<GetReqContext, table::Error> {
|
||||
lazy_static! {
|
||||
static ref RE_GOOGLE_SHEET: Regex =
|
||||
Regex::new(r"https://docs.google.com/spreadsheets/d/(.+)").unwrap();
|
||||
@ -347,17 +352,28 @@ pub async fn to_mem_table(
|
||||
.context(table::LoadGoogleSheetSnafu)?,
|
||||
};
|
||||
|
||||
let resp = gs_api_get(
|
||||
token_str,
|
||||
&format!(
|
||||
"https://sheets.googleapis.com/v4/spreadsheets/{spreadsheet_id}/values/{sheet_title}",
|
||||
Ok(GetReqContext {
|
||||
token,
|
||||
url: format!(
|
||||
"https://sheets.googleapis.com/v4/spreadsheets/{spreadsheet_id}/values/{sheet_title}"
|
||||
),
|
||||
)
|
||||
.await
|
||||
.context(table::LoadGoogleSheetSnafu)?
|
||||
.error_for_status()
|
||||
.context(HttpStatusSnafu)
|
||||
.context(table::LoadGoogleSheetSnafu)?;
|
||||
})
|
||||
}
|
||||
|
||||
async fn to_mem_table(
|
||||
ctx: GetReqContext,
|
||||
) -> Result<datafusion::datasource::MemTable, table::Error> {
|
||||
let token_str = ctx
|
||||
.token
|
||||
.token()
|
||||
.ok_or(Error::EmptyToken {})
|
||||
.context(table::LoadGoogleSheetSnafu)?;
|
||||
let resp = gs_api_get(token_str, &ctx.url)
|
||||
.await
|
||||
.context(table::LoadGoogleSheetSnafu)?
|
||||
.error_for_status()
|
||||
.context(HttpStatusSnafu)
|
||||
.context(table::LoadGoogleSheetSnafu)?;
|
||||
|
||||
let sheet = resp
|
||||
.json::<SpreadsheetValues>()
|
||||
@ -373,6 +389,15 @@ pub async fn to_mem_table(
|
||||
.context(table::CreateMemTableSnafu)
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(t: &TableSource) -> Result<LoadedTable, table::Error> {
|
||||
let ctx = gs_get_req_contex(t).await?;
|
||||
let to_datafusion_table = move || {
|
||||
let ctx = ctx.clone();
|
||||
async { Ok(Arc::new(to_mem_table(ctx).await?) as Arc<dyn TableProvider>) }
|
||||
};
|
||||
LoadedTable::new_from_df_table_cb(to_datafusion_table).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@ -6,6 +6,7 @@ use datafusion::arrow::datatypes::Schema;
|
||||
#[allow(deprecated)]
|
||||
use datafusion::arrow::json::reader::ReaderBuilder;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use serde_json::value::Value;
|
||||
use snafu::prelude::*;
|
||||
|
||||
@ -213,13 +214,18 @@ pub async fn to_mem_table(
|
||||
.context(table::CreateMemTableSnafu)
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
async fn to_datafusion_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
Ok(Arc::new(to_mem_table(&t, &dfctx).await?))
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
to_mem_table(t, dfctx).await?,
|
||||
)))
|
||||
LoadedTable::new_from_df_table_cb(move || to_datafusion_table(t.clone(), dfctx.clone())).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -433,7 +433,7 @@ pub struct TableSource {
|
||||
pub batch_size: usize,
|
||||
pub partition_columns: Option<Vec<TableColumn>>,
|
||||
#[serde(default = "TableSource::default_refresh_interval")]
|
||||
pub refresh_interval: Option<std::time::Duration>,
|
||||
pub refresh_interval: Option<tokio::time::Duration>,
|
||||
}
|
||||
|
||||
impl From<KeyValueSource> for TableSource {
|
||||
@ -650,36 +650,39 @@ pub async fn datafusion_get_or_infer_schema(
|
||||
})
|
||||
}
|
||||
|
||||
// pub type TableRefresher = Arc<
|
||||
// tokio::sync::Mutex<
|
||||
// dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Arc<dyn TableProvider>, Error>> + Send>>
|
||||
// + Send,
|
||||
// >,
|
||||
// >;
|
||||
//
|
||||
pub type TableRefresher = Box<
|
||||
dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Arc<dyn TableProvider>, Error>> + Send>>
|
||||
+ Send,
|
||||
>;
|
||||
pub type TableRefresherOutput =
|
||||
Pin<Box<dyn Future<Output = Result<Arc<dyn TableProvider>, Error>> + Send>>;
|
||||
|
||||
pub type TableRefresher = Box<dyn FnMut() -> TableRefresherOutput + Send>;
|
||||
|
||||
pub struct LoadedTable {
|
||||
pub table: Arc<dyn TableProvider>,
|
||||
pub refresher: Option<TableRefresher>,
|
||||
pub refresher: TableRefresher,
|
||||
}
|
||||
|
||||
impl LoadedTable {
|
||||
pub fn new(table: Arc<dyn TableProvider>, refresher: TableRefresher) -> Self {
|
||||
Self {
|
||||
table,
|
||||
refresher: Some(refresher),
|
||||
}
|
||||
Self { table, refresher }
|
||||
}
|
||||
|
||||
pub fn new_from_table(table: Arc<dyn TableProvider>) -> Self {
|
||||
Self {
|
||||
table,
|
||||
refresher: None,
|
||||
}
|
||||
pub async fn new_from_df_table_cb<F, Fut>(to_df_table_cb: F) -> Result<Self, Error>
|
||||
where
|
||||
F: Fn() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = Result<Arc<dyn TableProvider>, Error>> + Send + 'static,
|
||||
{
|
||||
let refresher =
|
||||
Box::new(move || Box::pin(to_df_table_cb()) as crate::table::TableRefresherOutput);
|
||||
Ok(Self::new(refresher().await?, refresher))
|
||||
}
|
||||
|
||||
pub fn new_from_df_table(table: Arc<dyn TableProvider>) -> Self {
|
||||
let t = table.clone();
|
||||
let refresher = Box::new(move || {
|
||||
let t = table.clone();
|
||||
let fut = async { Ok(t) };
|
||||
Box::pin(fut) as crate::table::TableRefresherOutput
|
||||
});
|
||||
Self::new(t, refresher)
|
||||
}
|
||||
}
|
||||
|
||||
@ -689,50 +692,54 @@ pub async fn load(
|
||||
) -> Result<LoadedTable, Error> {
|
||||
if let Some(opt) = &t.option {
|
||||
Ok(match opt {
|
||||
TableLoadOption::json { .. } => json::to_datafusion_table(t, dfctx).await?,
|
||||
TableLoadOption::json { .. } => json::to_loaded_table(t.clone(), dfctx.clone()).await?,
|
||||
TableLoadOption::ndjson { .. } | TableLoadOption::jsonl { .. } => {
|
||||
ndjson::to_datafusion_table(t, dfctx).await?
|
||||
ndjson::to_loaded_table(t.clone(), dfctx.clone()).await?
|
||||
}
|
||||
TableLoadOption::csv { .. } => csv::to_loaded_table(t.clone(), dfctx.clone()).await?,
|
||||
TableLoadOption::parquet { .. } => {
|
||||
parquet::to_loaded_table(t.clone(), dfctx.clone()).await?
|
||||
}
|
||||
TableLoadOption::csv { .. } => csv::to_datafusion_table(t, dfctx).await?,
|
||||
TableLoadOption::parquet { .. } => parquet::to_datafusion_table(t, dfctx).await?,
|
||||
TableLoadOption::google_spreadsheet(_) => {
|
||||
LoadedTable::new_from_table(Arc::new(google_spreadsheets::to_mem_table(t).await?))
|
||||
google_spreadsheets::to_loaded_table(t).await?
|
||||
}
|
||||
TableLoadOption::xlsx { .. }
|
||||
| TableLoadOption::xls { .. }
|
||||
| TableLoadOption::xlsb { .. }
|
||||
| TableLoadOption::ods { .. } => excel::to_datafusion_table(t).await?,
|
||||
TableLoadOption::delta { .. } => delta::to_datafusion_table(t, dfctx).await?,
|
||||
TableLoadOption::arrow { .. } => arrow_ipc_file::to_datafusion_table(t, dfctx).await?,
|
||||
TableLoadOption::arrows { .. } => {
|
||||
arrow_ipc_stream::to_datafusion_table(t, dfctx).await?
|
||||
| TableLoadOption::ods { .. } => excel::to_loaded_table(t.clone()).await?,
|
||||
TableLoadOption::delta { .. } => delta::to_loaded_table(t, dfctx).await?,
|
||||
TableLoadOption::arrow { .. } => {
|
||||
arrow_ipc_file::to_loaded_table(t.clone(), dfctx.clone()).await?
|
||||
}
|
||||
TableLoadOption::mysql { .. } => LoadedTable::new_from_table(Arc::new(
|
||||
TableLoadOption::arrows { .. } => {
|
||||
arrow_ipc_stream::to_loaded_table(t.clone(), dfctx.clone()).await?
|
||||
}
|
||||
TableLoadOption::mysql { .. } => LoadedTable::new_from_df_table(Arc::new(
|
||||
database::DatabaseLoader::MySQL.to_mem_table(t)?,
|
||||
)),
|
||||
TableLoadOption::sqlite { .. } => LoadedTable::new_from_table(Arc::new(
|
||||
TableLoadOption::sqlite { .. } => LoadedTable::new_from_df_table(Arc::new(
|
||||
database::DatabaseLoader::SQLite.to_mem_table(t)?,
|
||||
)),
|
||||
TableLoadOption::postgres { .. } => LoadedTable::new_from_table(Arc::new(
|
||||
TableLoadOption::postgres { .. } => LoadedTable::new_from_df_table(Arc::new(
|
||||
database::DatabaseLoader::Postgres.to_mem_table(t)?,
|
||||
)),
|
||||
})
|
||||
} else {
|
||||
match t.extension()? {
|
||||
"csv" => csv::to_datafusion_table(t, dfctx).await,
|
||||
"json" => json::to_datafusion_table(t, dfctx).await,
|
||||
"ndjson" | "jsonl" => ndjson::to_datafusion_table(t, dfctx).await,
|
||||
"parquet" => parquet::to_datafusion_table(t, dfctx).await,
|
||||
"xls" | "xlsx" | "xlsb" | "ods" => excel::to_datafusion_table(t).await,
|
||||
"arrow" => arrow_ipc_file::to_datafusion_table(t, dfctx).await,
|
||||
"arrows" => arrow_ipc_stream::to_datafusion_table(t, dfctx).await,
|
||||
"mysql" => Ok(LoadedTable::new_from_table(Arc::new(
|
||||
"csv" => csv::to_loaded_table(t.clone(), dfctx.clone()).await,
|
||||
"json" => json::to_loaded_table(t.clone(), dfctx.clone()).await,
|
||||
"ndjson" | "jsonl" => ndjson::to_loaded_table(t.clone(), dfctx.clone()).await,
|
||||
"parquet" => parquet::to_loaded_table(t.clone(), dfctx.clone()).await,
|
||||
"xls" | "xlsx" | "xlsb" | "ods" => excel::to_loaded_table(t.clone()).await,
|
||||
"arrow" => arrow_ipc_file::to_loaded_table(t.clone(), dfctx.clone()).await,
|
||||
"arrows" => arrow_ipc_stream::to_loaded_table(t.clone(), dfctx.clone()).await,
|
||||
"mysql" => Ok(LoadedTable::new_from_df_table(Arc::new(
|
||||
database::DatabaseLoader::MySQL.to_mem_table(t)?,
|
||||
))),
|
||||
"sqlite" => Ok(LoadedTable::new_from_table(Arc::new(
|
||||
"sqlite" => Ok(LoadedTable::new_from_df_table(Arc::new(
|
||||
database::DatabaseLoader::SQLite.to_mem_table(t)?,
|
||||
))),
|
||||
"postgresql" => Ok(LoadedTable::new_from_table(Arc::new(
|
||||
"postgresql" => Ok(LoadedTable::new_from_df_table(Arc::new(
|
||||
database::DatabaseLoader::Postgres.to_mem_table(t)?,
|
||||
))),
|
||||
ext => Err(Error::InvalidUri {
|
||||
|
||||
@ -5,6 +5,7 @@ use datafusion::arrow::datatypes::{Schema, SchemaRef};
|
||||
#[allow(deprecated)]
|
||||
use datafusion::arrow::json::reader::{infer_json_schema, ReaderBuilder};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::table::{self, LoadedTable, TableSource};
|
||||
@ -89,13 +90,18 @@ pub async fn to_mem_table(
|
||||
.context(table::CreateMemTableSnafu)
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
async fn to_datafusion_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
Ok(Arc::new(to_mem_table(&t, &dfctx).await?))
|
||||
}
|
||||
|
||||
pub async fn to_loaded_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
to_mem_table(t, dfctx).await?,
|
||||
)))
|
||||
LoadedTable::new_from_df_table_cb(move || to_datafusion_table(t.clone(), dfctx.clone())).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -45,10 +45,17 @@ pub enum Error {
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn to_datafusion_table(
|
||||
t: &TableSource,
|
||||
dfctx: &datafusion::execution::context::SessionContext,
|
||||
pub async fn to_loaded_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<LoadedTable, table::Error> {
|
||||
LoadedTable::new_from_df_table_cb(move || to_datafusion_table(t.clone(), dfctx.clone())).await
|
||||
}
|
||||
|
||||
async fn to_datafusion_table(
|
||||
t: TableSource,
|
||||
dfctx: datafusion::execution::context::SessionContext,
|
||||
) -> Result<Arc<dyn TableProvider>, table::Error> {
|
||||
let opt = t
|
||||
.option
|
||||
.clone()
|
||||
@ -56,7 +63,7 @@ pub async fn to_datafusion_table(
|
||||
let TableOptionParquet { use_memory_table } = opt.as_parquet()?;
|
||||
|
||||
if *use_memory_table {
|
||||
Ok(LoadedTable::new_from_table(to_mem_table(t, dfctx).await?))
|
||||
to_mem_table(&t, &dfctx).await
|
||||
} else {
|
||||
let table_url = ListingTableUrl::parse(t.get_uri_str())
|
||||
.context(ParseUriSnafu)
|
||||
@ -67,7 +74,7 @@ pub async fn to_datafusion_table(
|
||||
}
|
||||
|
||||
let schemaref = datafusion_get_or_infer_schema(
|
||||
dfctx,
|
||||
&dfctx,
|
||||
&table_url,
|
||||
&options,
|
||||
&t.schema,
|
||||
@ -78,9 +85,9 @@ pub async fn to_datafusion_table(
|
||||
let table_config = ListingTableConfig::new(table_url)
|
||||
.with_listing_options(options)
|
||||
.with_schema(schemaref);
|
||||
Ok(LoadedTable::new_from_table(Arc::new(
|
||||
Ok(Arc::new(
|
||||
ListingTable::try_new(table_config).context(table::CreateListingTableSnafu)?,
|
||||
)))
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,15 +172,15 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn load_flattened_parquet() {
|
||||
let ctx = SessionContext::new();
|
||||
let t = to_datafusion_table(
|
||||
&TableSource::new(
|
||||
let t = to_loaded_table(
|
||||
TableSource::new(
|
||||
"blogs".to_string(),
|
||||
test_data_path("blogs_flattened.parquet"),
|
||||
)
|
||||
.with_option(TableLoadOption::parquet(TableOptionParquet {
|
||||
use_memory_table: false,
|
||||
})),
|
||||
&ctx,
|
||||
ctx.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@ -5,7 +5,7 @@ use std::sync::Arc;
|
||||
use datafusion::arrow;
|
||||
use datafusion::prelude::SessionContext;
|
||||
|
||||
use columnq::table::csv::to_datafusion_table;
|
||||
use columnq::table::csv::to_loaded_table;
|
||||
use columnq::table::{TableIoSource, TableLoadOption, TableOptionCsv, TableSource};
|
||||
|
||||
#[tokio::test]
|
||||
@ -34,26 +34,24 @@ async fn infer_csv_schema_by_selected_files() {
|
||||
assert!(table_source.schema_from_files.is_some());
|
||||
assert_eq!(table_source.schema, None);
|
||||
|
||||
match to_datafusion_table(&table_source, &ctx).await {
|
||||
match to_loaded_table(table_source.clone(), ctx.clone()).await {
|
||||
Err(columnq::table::Error::Generic { msg }) => {
|
||||
assert_eq!(&msg, "schema_from_files is an empty list");
|
||||
}
|
||||
_ => panic!("Empty schema_from_files should result in an error"),
|
||||
}
|
||||
|
||||
let t = to_datafusion_table(
|
||||
&table_source.with_schema_from_files(vec!["year=2023/month=1/p001.csv".to_string()]),
|
||||
&ctx,
|
||||
let t = to_loaded_table(
|
||||
table_source.with_schema_from_files(vec!["year=2023/month=1/p001.csv".to_string()]),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
.unwrap()
|
||||
.table;
|
||||
|
||||
let mut builder = SchemaBuilder::new();
|
||||
builder.push(Field::new("ts", DataType::Int64, true));
|
||||
builder.push(Field::new("value", DataType::Float64, true));
|
||||
|
||||
assert_eq!(
|
||||
t.table.schema(),
|
||||
Arc::new(Schema::new(builder.finish().fields))
|
||||
);
|
||||
assert_eq!(t.schema(), Arc::new(Schema::new(builder.finish().fields)));
|
||||
}
|
||||
|
||||
61
flake.lock
Normal file
61
flake.lock
Normal file
@ -0,0 +1,61 @@
|
||||
{
|
||||
"nodes": {
|
||||
"flake-utils": {
|
||||
"inputs": {
|
||||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1731533236,
|
||||
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1735412871,
|
||||
"narHash": "sha256-Qoz0ow6jDGUIBHxduc7Y1cjYFS71tvEGJV5Src/mj98=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "9f94733f93e4fe6e82f516efae007096e4ab5a21",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "release-24.11",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"systems": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"type": "github"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
||||
29
flake.nix
Normal file
29
flake.nix
Normal file
@ -0,0 +1,29 @@
|
||||
{
|
||||
description = "ROAPI";
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/release-24.11";
|
||||
flake-utils.url = "github:numtide/flake-utils";
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, flake-utils }:
|
||||
flake-utils.lib.eachDefaultSystem
|
||||
(system:
|
||||
let pkgs = nixpkgs.legacyPackages.${system}; in
|
||||
{
|
||||
devShells.default = pkgs.mkShell {
|
||||
name = "roapi";
|
||||
buildInputs = [
|
||||
# for databases feature
|
||||
pkgs.openssl
|
||||
pkgs.pkg-config
|
||||
];
|
||||
shellHook = ''
|
||||
export OPENSSL_DIR="${pkgs.openssl.dev}"
|
||||
export OPENSSL_LIB_DIR="${pkgs.openssl.out}/lib"
|
||||
export OPENSSL_INCLUDE_DIR="${pkgs.openssl.dev}/include"
|
||||
export PKG_CONFIG_PATH="${pkgs.openssl.dev}/lib/pkgconfig:$PKG_CONFIG_PATH"
|
||||
'';
|
||||
};
|
||||
}
|
||||
);
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user