feat: automatic table refresh based on reload-interval config (#198)

Co-authored-by: Qingping Hou <dave2008713@gmail.com>
This commit is contained in:
Charlie Harrington 2022-10-15 22:00:55 -07:00 committed by GitHub
parent e9d0a03b89
commit cc68167d9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 4 deletions

View File

@ -1,11 +1,12 @@
use serde_derive::Deserialize;
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use columnq::table::parse_table_uri_arg;
use columnq::table::KeyValueSource;
use columnq::table::TableSource;
use std::fs;
use std::time::Duration;
#[derive(Deserialize, Default, Clone)]
pub struct AddrConfig {
@ -17,6 +18,7 @@ pub struct AddrConfig {
pub struct Config {
pub addr: AddrConfig,
pub tables: Vec<TableSource>,
pub reload_interval: Option<Duration>,
#[serde(default)]
pub disable_read_only: bool,
#[serde(default)]
@ -64,6 +66,15 @@ fn read_only_arg() -> clap::Arg<'static> {
.short('d')
}
fn reload_interval_arg() -> clap::Arg<'static> {
clap::Arg::new("reload-interval")
.help("maximum age in seconds before triggering rescan and reload of the tables")
.required(false)
.takes_value(true)
.long("reload-interval")
.short('m')
}
fn config_arg() -> clap::Arg<'static> {
clap::Arg::new("config")
.help("config file path")
@ -86,6 +97,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
address_postgres_arg(),
config_arg(),
read_only_arg(),
reload_interval_arg(),
table_arg(),
])
.get_matches();
@ -118,5 +130,14 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
config.disable_read_only = true;
}
if let Some(reload_interval) = matches.value_of("reload-interval") {
if !config.disable_read_only {
bail!("Table reload not supported in read-only mode. Try specify the --disable-read-only option.");
}
config.reload_interval = Some(Duration::from_secs(
reload_interval.to_string().parse().unwrap(),
));
}
Ok(config)
}

View File

@ -1,18 +1,46 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use columnq::table::TableSource;
use log::info;
use log::{error, info};
use tokio::sync::{Mutex, RwLock};
use tokio::time;
use crate::config::Config;
use crate::context::ConcurrentRoapiContext;
use crate::context::RawRoapiContext;
use crate::context::{ConcurrentRoapiContext, RoapiContext};
use crate::server;
pub struct TableReloader {
reload_interval: Duration,
ctx_ext: Arc<RwLock<RawRoapiContext>>,
tables: Arc<Mutex<HashMap<String, TableSource>>>,
}
impl TableReloader {
pub async fn run(self) {
let mut interval = time::interval(self.reload_interval);
loop {
interval.tick().await;
for (table_name, table) in self.tables.lock().await.iter() {
match self.ctx_ext.load_table(table).await {
Ok(_) => {
info!("table {} reloaded", table_name);
}
Err(err) => {
error!("failed to reload table {}: {:?}", table_name, err);
}
}
}
}
}
}
pub struct Application {
http_addr: std::net::SocketAddr,
http_server: server::http::HttpApiServer,
table_reloader: Option<TableReloader>,
postgres_server: Box<dyn server::RunnableServer>,
}
@ -41,6 +69,13 @@ impl Application {
)
.await,
);
let table_reloader = config.reload_interval.map(|reload_interval| TableReloader {
reload_interval,
tables: tables.clone(),
ctx_ext: ctx_ext.clone(),
});
let (http_server, http_addr) = server::http::build_http_server::<ConcurrentRoapiContext>(
ctx_ext,
tables,
@ -52,6 +87,7 @@ impl Application {
http_addr,
http_server,
postgres_server,
table_reloader,
})
} else {
let ctx_ext = Arc::new(handler_ctx);
@ -74,6 +110,7 @@ impl Application {
http_addr,
http_server,
postgres_server,
table_reloader: None,
})
}
}
@ -98,7 +135,11 @@ impl Application {
.await
.expect("Failed to run postgres server");
});
if let Some(table_reloader) = self.table_reloader {
tokio::spawn(async move {
table_reloader.run().await;
});
}
info!("🚀 Listening on {} for HTTP traffic...", self.http_addr);
Ok(self.http_server.await?)
}

View File

@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::time::Duration;
use columnq::datafusion::arrow;
use columnq::table::{KeyValueSource, TableColumn, TableLoadOption, TableSchema, TableSource};
@ -16,6 +17,7 @@ pub async fn test_api_app_with_tables(tables: Vec<TableSource>) -> (Application,
test_api_app(tables, vec![]).await
}
#[allow(dead_code)]
pub async fn test_api_app_with_kvstores(kvstores: Vec<KeyValueSource>) -> (Application, String) {
test_api_app(vec![], kvstores).await
}
@ -30,6 +32,7 @@ pub async fn test_api_app(
postgres: "localhost:0".to_string().into(),
},
tables,
reload_interval: Some(Duration::from_secs(1000)),
disable_read_only: false,
kvstores,
};
@ -42,6 +45,7 @@ pub async fn test_api_app(
(app, http_base)
}
#[allow(dead_code)]
pub async fn http_get(url: &str, accept: Option<&str>) -> reqwest::Response {
let request = reqwest::Client::new().get(url);
let request = if let Some(accept) = accept {
@ -52,6 +56,7 @@ pub async fn http_get(url: &str, accept: Option<&str>) -> reqwest::Response {
request.send().await.expect("Unable to execute GET request")
}
#[allow(dead_code)]
pub async fn http_post(url: &str, payload: impl Into<reqwest::Body>) -> reqwest::Response {
reqwest::Client::new()
.post(url)
@ -66,6 +71,7 @@ pub fn get_spacex_table() -> TableSource {
TableSource::new("spacex_launches".to_string(), json_source_path)
}
#[allow(dead_code)]
pub fn get_uk_cities_table() -> TableSource {
TableSource::new(
"uk_cities".to_string(),
@ -73,6 +79,7 @@ pub fn get_uk_cities_table() -> TableSource {
)
}
#[allow(dead_code)]
pub fn get_ubuntu_ami_table() -> TableSource {
TableSource::new("ubuntu_ami", test_data_path("ubuntu-ami.json"))
.with_option(TableLoadOption::json {
@ -125,6 +132,7 @@ pub fn get_ubuntu_ami_table() -> TableSource {
})
}
#[allow(dead_code)]
pub fn get_spacex_launch_name_kvstore() -> KeyValueSource {
KeyValueSource::new(
"spacex_launch_name",