Support for Dynamic Table Deletion #340 (#343)
Some checks failed
build / build (push) Has been cancelled
build / database_test (push) Has been cancelled
build / object_store_memory_test (push) Has been cancelled
build / object_store_direct_test (push) Has been cancelled
build / openssl_build (push) Has been cancelled
build / mac_cross_build (push) Has been cancelled
build / Docker Image Build (push) Has been cancelled
columnq-cli release / Validate git tag (push) Has been cancelled
roapi release / Validate git tag (push) Has been cancelled
columnq-cli release / macos (push) Has been cancelled
columnq-cli release / windows (map[features:database-sqlite python-architecture:x64 target:x86_64-pc-windows-msvc]) (push) Has been cancelled
columnq-cli release / linux (map[features:rustls,database-sqlite image_tag:aarch64-musl manylinux:2014 name_suffix: rustflags: target:aarch64-unknown-linux-musl upload:true]) (push) Has been cancelled
columnq-cli release / linux (map[features:rustls,database-sqlite image_tag:x86_64-musl manylinux:2010 name_suffix: rustflags:-C target-cpu=skylake target:x86_64-unknown-linux-musl upload:true]) (push) Has been cancelled
columnq-cli release / PyPI Release (push) Has been cancelled
roapi release / macos (push) Has been cancelled
roapi release / windows (map[features:database-sqlite python-architecture:x64 target:x86_64-pc-windows-msvc]) (push) Has been cancelled
roapi release / linux (map[features:rustls,database-sqlite image_tag:aarch64-musl manylinux:2014 name_suffix: rustflags: target:aarch64-unknown-linux-musl upload:true]) (push) Has been cancelled
roapi release / linux (map[features:rustls,database-sqlite image_tag:x86_64-musl manylinux:2010 name_suffix: rustflags:-C target-cpu=skylake target:x86_64-unknown-linux-musl upload:true]) (push) Has been cancelled
roapi release / PyPI Release (push) Has been cancelled
roapi release / Docker Image Release (push) Has been cancelled

closes #340
This commit is contained in:
Brendan Ryback 2024-08-25 17:02:56 -04:00 committed by GitHub
parent 08ec089a84
commit b776dd34ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 115 additions and 2 deletions

View File

@ -110,6 +110,12 @@ impl ColumnQ {
Ok(())
}
pub async fn drop_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> {
self.schema_map.remove(&t.name);
self.dfctx.deregister_table(t.name.as_str())?;
Ok(())
}
pub fn register_object_storage(
&mut self,
url: &Url,

46
roapi/src/api/drop.rs Normal file
View File

@ -0,0 +1,46 @@
use std::{collections::HashMap, sync::Arc};
use axum::extract::{Extension, Json};
use columnq::error::ColumnQError;
use columnq::table::TableSource;
use log::info;
use serde::Deserialize;
use tokio::sync::Mutex;
use crate::context::RoapiContext;
use crate::error::ApiErrResp;
#[derive(Debug, Deserialize)]
pub struct SourceConfig {
#[serde(rename = "tableName")]
pub table_name: String,
}
pub async fn drop_table<H: RoapiContext>(
Extension(ctx): Extension<Arc<H>>,
Extension(tables): Extension<Arc<Mutex<HashMap<String, TableSource>>>>,
Json(body): Json<Vec<SourceConfig>>,
) -> Result<(), ApiErrResp> {
let mut tables = tables.lock().await;
for config in body {
if let Some(t) = tables.get(&config.table_name) {
info!("dropping table `{}`", t.name);
ctx.drop_table(t)
.await
.map_err(ColumnQError::from)
.map_err(ApiErrResp::drop_table)?;
tables.remove(&config.table_name);
info!("dropped table `{}`", config.table_name);
} else {
return Err(ApiErrResp::not_found(format!(
"Table `{}` source does not exist",
config.table_name
)));
}
}
Ok(())
}
pub async fn drop_table_read_only() -> Result<(), ApiErrResp> {
Err(ApiErrResp::read_only_mode())
}

View File

@ -56,6 +56,7 @@ pub fn encode_record_batches(
Ok(bytes_to_resp(payload, content_type.to_str()))
}
pub mod drop;
pub mod graphql;
pub mod health;
pub mod kv;

View File

@ -20,9 +20,13 @@ pub fn register_app_routes<H: RoapiContext>() -> Router {
);
if H::read_only_mode() {
router = router.route("/api/table", post(api::register::register_table_read_only));
router = router
.route("/api/table", post(api::register::register_table_read_only))
.route("/api/tables/drop", post(api::drop::drop_table_read_only));
} else {
router = router.route("/api/table", post(api::register::register_table::<H>));
router = router
.route("/api/table", post(api::register::register_table::<H>))
.route("/api/tables/drop", post(api::drop::drop_table::<H>));
}
router

View File

@ -62,6 +62,8 @@ pub trait RoapiContext: Send + Sync + 'static {
async fn load_table(&self, table: &TableSource) -> Result<(), ColumnQError>;
async fn drop_table(&self, table: &TableSource) -> Result<(), ColumnQError>;
async fn schemas(&self) -> Result<Vec<(String, arrow::datatypes::SchemaRef)>, ApiErrResp>;
async fn schemas_json_bytes(&self) -> Result<Vec<u8>, ApiErrResp>;
@ -108,6 +110,13 @@ impl RoapiContext for RawRoapiContext {
))
}
#[inline]
async fn drop_table(&self, _table: &TableSource) -> Result<(), ColumnQError> {
Err(ColumnQError::Generic(
"Table update not supported in read only mode".to_string(),
))
}
#[inline]
async fn schemas(&self) -> Result<Vec<(String, arrow::datatypes::SchemaRef)>, ApiErrResp> {
Ok(self
@ -209,6 +218,12 @@ impl RoapiContext for ConcurrentRoapiContext {
ctx.cq.load_table(table).await
}
#[inline]
async fn drop_table(&self, table: &TableSource) -> Result<(), ColumnQError> {
let mut ctx = self.write().await;
ctx.cq.drop_table(table).await
}
#[inline]
async fn table_names(&self) -> Vec<String> {
let ctx = self.read().await;

View File

@ -97,6 +97,14 @@ impl ApiErrResp {
}
}
pub fn drop_table(error: ColumnQError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,
error: "drop_table".to_string(),
message: error.to_string(),
}
}
pub fn load_table(error: ColumnQError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,

View File

@ -4,6 +4,7 @@ use std::collections::HashMap;
use async_process::Command;
use columnq::arrow::datatypes::Schema;
use serde_json::json;
#[tokio::test]
async fn test_schema() {
@ -18,6 +19,26 @@ async fn test_schema() {
assert!(body.contains_key("spacex_launches"));
}
#[tokio::test]
async fn test_drop_table() {
let json_table = helpers::get_spacex_table();
let (app, address) = helpers::test_api_app_with_tables(vec![json_table]).await;
tokio::spawn(app.run_until_stopped());
let get_response =
helpers::http_get(&format!("{address}/api/tables/spacex_launches"), None).await;
assert_eq!(get_response.status(), 200);
let drop_response = helpers::http_post_json(
&format!("{address}/api/tables/drop"),
json!([{"tableName": "spacex_launches"}]).to_string(),
)
.await;
assert_eq!(drop_response.status(), 200);
let failed_get_response =
helpers::http_get(&format!("{address}/api/tables/spaces_launches"), None).await;
assert_eq!(failed_get_response.status(), 400);
}
#[tokio::test]
async fn test_uk_cities_sql_post() {
let table = helpers::get_uk_cities_table();

View File

@ -35,6 +35,7 @@ pub async fn test_api_app(
},
tables,
reload_interval: Some(Duration::from_secs(1000)),
disable_read_only: true,
kvstores,
..Default::default()
};
@ -68,6 +69,17 @@ pub async fn http_post(url: &str, payload: impl Into<reqwest::Body>) -> reqwest:
.expect("Unable to execute POST request")
}
#[allow(dead_code)]
pub async fn http_post_json(url: &str, payload: impl Into<reqwest::Body>) -> reqwest::Response {
reqwest::Client::new()
.post(url)
.header("Content-Type", "application/json")
.body(payload)
.send()
.await
.expect("Unable to execute POST request")
}
#[allow(dead_code)]
pub fn get_spacex_table() -> TableSource {
let json_source_path = test_data_path("spacex_launches.json");