diff --git a/roapi/src/config.rs b/roapi/src/config.rs index 732fc50..ed334c0 100644 --- a/roapi/src/config.rs +++ b/roapi/src/config.rs @@ -1,11 +1,12 @@ use serde_derive::Deserialize; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use columnq::table::parse_table_uri_arg; use columnq::table::KeyValueSource; use columnq::table::TableSource; use std::fs; +use std::time::Duration; #[derive(Deserialize, Default, Clone)] pub struct AddrConfig { @@ -17,6 +18,7 @@ pub struct AddrConfig { pub struct Config { pub addr: AddrConfig, pub tables: Vec, + pub reload_interval: Option, #[serde(default)] pub disable_read_only: bool, #[serde(default)] @@ -64,6 +66,15 @@ fn read_only_arg() -> clap::Arg<'static> { .short('d') } +fn reload_interval_arg() -> clap::Arg<'static> { + clap::Arg::new("reload-interval") + .help("maximum age in seconds before triggering rescan and reload of the tables") + .required(false) + .takes_value(true) + .long("reload-interval") + .short('m') +} + fn config_arg() -> clap::Arg<'static> { clap::Arg::new("config") .help("config file path") @@ -86,6 +97,7 @@ pub fn get_configuration() -> Result { address_postgres_arg(), config_arg(), read_only_arg(), + reload_interval_arg(), table_arg(), ]) .get_matches(); @@ -118,5 +130,14 @@ pub fn get_configuration() -> Result { config.disable_read_only = true; } + if let Some(reload_interval) = matches.value_of("reload-interval") { + if !config.disable_read_only { + bail!("Table reload not supported in read-only mode. Try specify the --disable-read-only option."); + } + config.reload_interval = Some(Duration::from_secs( + reload_interval.to_string().parse().unwrap(), + )); + } + Ok(config) } diff --git a/roapi/src/startup.rs b/roapi/src/startup.rs index 3c3f2f3..5883bc4 100644 --- a/roapi/src/startup.rs +++ b/roapi/src/startup.rs @@ -1,18 +1,46 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use columnq::table::TableSource; -use log::info; +use log::{error, info}; use tokio::sync::{Mutex, RwLock}; +use tokio::time; use crate::config::Config; -use crate::context::ConcurrentRoapiContext; use crate::context::RawRoapiContext; +use crate::context::{ConcurrentRoapiContext, RoapiContext}; use crate::server; +pub struct TableReloader { + reload_interval: Duration, + ctx_ext: Arc>, + tables: Arc>>, +} + +impl TableReloader { + pub async fn run(self) { + let mut interval = time::interval(self.reload_interval); + loop { + interval.tick().await; + for (table_name, table) in self.tables.lock().await.iter() { + match self.ctx_ext.load_table(table).await { + Ok(_) => { + info!("table {} reloaded", table_name); + } + Err(err) => { + error!("failed to reload table {}: {:?}", table_name, err); + } + } + } + } + } +} + pub struct Application { http_addr: std::net::SocketAddr, http_server: server::http::HttpApiServer, + table_reloader: Option, postgres_server: Box, } @@ -41,6 +69,13 @@ impl Application { ) .await, ); + + let table_reloader = config.reload_interval.map(|reload_interval| TableReloader { + reload_interval, + tables: tables.clone(), + ctx_ext: ctx_ext.clone(), + }); + let (http_server, http_addr) = server::http::build_http_server::( ctx_ext, tables, @@ -52,6 +87,7 @@ impl Application { http_addr, http_server, postgres_server, + table_reloader, }) } else { let ctx_ext = Arc::new(handler_ctx); @@ -74,6 +110,7 @@ impl Application { http_addr, http_server, postgres_server, + table_reloader: None, }) } } @@ -98,7 +135,11 @@ impl Application { .await .expect("Failed to run postgres server"); }); - + if let Some(table_reloader) = self.table_reloader { + tokio::spawn(async move { + table_reloader.run().await; + }); + } info!("🚀 Listening on {} for HTTP traffic...", self.http_addr); Ok(self.http_server.await?) } diff --git a/roapi/tests/helpers.rs b/roapi/tests/helpers.rs index 203a0e5..c98adec 100644 --- a/roapi/tests/helpers.rs +++ b/roapi/tests/helpers.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::time::Duration; use columnq::datafusion::arrow; use columnq::table::{KeyValueSource, TableColumn, TableLoadOption, TableSchema, TableSource}; @@ -16,6 +17,7 @@ pub async fn test_api_app_with_tables(tables: Vec) -> (Application, test_api_app(tables, vec![]).await } +#[allow(dead_code)] pub async fn test_api_app_with_kvstores(kvstores: Vec) -> (Application, String) { test_api_app(vec![], kvstores).await } @@ -30,6 +32,7 @@ pub async fn test_api_app( postgres: "localhost:0".to_string().into(), }, tables, + reload_interval: Some(Duration::from_secs(1000)), disable_read_only: false, kvstores, }; @@ -42,6 +45,7 @@ pub async fn test_api_app( (app, http_base) } +#[allow(dead_code)] pub async fn http_get(url: &str, accept: Option<&str>) -> reqwest::Response { let request = reqwest::Client::new().get(url); let request = if let Some(accept) = accept { @@ -52,6 +56,7 @@ pub async fn http_get(url: &str, accept: Option<&str>) -> reqwest::Response { request.send().await.expect("Unable to execute GET request") } +#[allow(dead_code)] pub async fn http_post(url: &str, payload: impl Into) -> reqwest::Response { reqwest::Client::new() .post(url) @@ -66,6 +71,7 @@ pub fn get_spacex_table() -> TableSource { TableSource::new("spacex_launches".to_string(), json_source_path) } +#[allow(dead_code)] pub fn get_uk_cities_table() -> TableSource { TableSource::new( "uk_cities".to_string(), @@ -73,6 +79,7 @@ pub fn get_uk_cities_table() -> TableSource { ) } +#[allow(dead_code)] pub fn get_ubuntu_ami_table() -> TableSource { TableSource::new("ubuntu_ami", test_data_path("ubuntu-ami.json")) .with_option(TableLoadOption::json { @@ -125,6 +132,7 @@ pub fn get_ubuntu_ami_table() -> TableSource { }) } +#[allow(dead_code)] pub fn get_spacex_launch_name_kvstore() -> KeyValueSource { KeyValueSource::new( "spacex_launch_name",