diff --git a/README.md b/README.md index 8dcb831..3b94270 100644 --- a/README.md +++ b/README.md @@ -205,6 +205,28 @@ To query tables using a subset of standard SQL, send the query through `POST` request to `/api/sql` endpoint. This is the only query interface that supports table joins. + +### Key value lookup + +You can pick two columns from a table to use a key and value to create a quick +keyvalue store API by adding the following lines to the config: + +```yaml +kvstores: + - name: "launch_name" + uri: "test_data/spacex_launches.json" + key: id + value: name +``` + +Key value lookup can be done through simple HTTP GET requests: + +```bash +curl -v localhost:8080/api/kv/launch_name/600f9a8d8f798e2a4d5f979e +Starlink-21 (v1.0)% +``` + + ## Features Query layer: @@ -220,6 +242,7 @@ Query layer: - [ ] gRPC - [ ] MySQL - [ ] Postgres +- [x] Key value lookup Response serialization: diff --git a/columnq/src/columnq.rs b/columnq/src/columnq.rs index b8be08a..509a628 100644 --- a/columnq/src/columnq.rs +++ b/columnq/src/columnq.rs @@ -1,16 +1,22 @@ +use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::sync::Arc; use datafusion::arrow; +use datafusion::arrow::array::as_string_array; +use datafusion::arrow::array::StringArray; pub use datafusion::execution::context::ExecutionConfig; use datafusion::execution::context::ExecutionContext; +use datafusion::physical_plan::collect; use crate::error::{ColumnQError, QueryError}; use crate::query; -use crate::table::{self, TableSource}; +use crate::table::{self, KeyValueSource, TableSource}; pub struct ColumnQ { dfctx: ExecutionContext, schema_map: HashMap, + kv_catalog: HashMap>>, } impl ColumnQ { @@ -21,7 +27,11 @@ impl ColumnQ { pub fn new_with_config(config: ExecutionConfig) -> Self { let dfctx = ExecutionContext::with_config(config); let schema_map = HashMap::::new(); - Self { dfctx, schema_map } + Self { + dfctx, + schema_map, + kv_catalog: HashMap::new(), + } } pub async fn load_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> { @@ -33,6 +43,62 @@ impl ColumnQ { Ok(()) } + pub async fn load_kv(&mut self, kv: KeyValueSource) -> Result<(), ColumnQError> { + use datafusion::arrow::datatypes::DataType; + + let kv_entry = self.kv_catalog.entry(kv.name.clone()); + let (key, value) = (kv.key.clone(), kv.value.clone()); + let table = table::load(&kv.into()).await?; + let schema = table.schema(); + let key_schema_idx = schema.index_of(&key)?; + if schema.field(key_schema_idx).data_type() != &DataType::Utf8 { + return Err(ColumnQError::invalid_kv_key_type()); + } + let val_schema_idx = schema.index_of(&value)?; + let projections = Some(vec![key_schema_idx, val_schema_idx]); + + let filters = &[]; + let exec_plan = table.scan(&projections, filters, None).await?; + let batches = collect(exec_plan, self.dfctx.runtime_env()).await?; + let mut kv = HashMap::new(); + for batch in batches { + let col_key = batch + .column(0) + .as_any() + .downcast_ref::() + .ok_or_else(ColumnQError::invalid_kv_key_type)?; + // TODO: take ownership of array data + let key_iter = col_key.iter(); + let col_val = batch.column(1); + match col_val.data_type() { + DataType::Utf8 => { + let val_iter = as_string_array(col_val).iter(); + key_iter + .zip(val_iter) + .for_each(|(key, val): (Option<&str>, Option<&str>)| { + // TODO: support null as value? error out on null? + if let (Some(key), Some(val)) = (key, val) { + kv.insert(key.to_string(), val.to_string()); + } + }); + } + other => { + todo!("unsupported type: {}", other); + } + } + } + + match kv_entry { + Entry::Occupied(mut entry) => { + entry.insert(Arc::new(kv)); + } + Entry::Vacant(entry) => { + entry.insert(Arc::new(kv)); + } + } + Ok(()) + } + pub fn schema_map(&self) -> &HashMap { &self.schema_map } @@ -58,6 +124,14 @@ impl ColumnQ { ) -> Result, QueryError> { query::rest::query_table(&self.dfctx, table_name, params).await } + + pub fn kv_get(&self, kv_name: &str, key: &str) -> Result, QueryError> { + let map = self + .kv_catalog + .get(kv_name) + .ok_or_else(|| QueryError::invalid_kv_name(kv_name))?; + Ok(map.get(key)) + } } impl Default for ColumnQ { diff --git a/columnq/src/error.rs b/columnq/src/error.rs index e61e17c..4abbd2f 100644 --- a/columnq/src/error.rs +++ b/columnq/src/error.rs @@ -87,6 +87,10 @@ impl ColumnQError { pub fn s3_obj_missing_key() -> Self { Self::S3Store("Missing key in S3 object list item".to_string()) } + + pub fn invalid_kv_key_type() -> Self { + Self::Generic("keyvalue store key datatype should be a string".to_string()) + } } impl From for ColumnQError { @@ -156,4 +160,11 @@ impl QueryError { message: format!("Failed to load table {}: {}", table_name, error), } } + + pub fn invalid_kv_name(kv_name: &str) -> Self { + Self { + error: "invalid_kv_name".to_string(), + message: format!("keyvalue store name `{}` doesn't exist", kv_name), + } + } } diff --git a/columnq/src/table/mod.rs b/columnq/src/table/mod.rs index 0309b88..ba242dd 100644 --- a/columnq/src/table/mod.rs +++ b/columnq/src/table/mod.rs @@ -300,6 +300,18 @@ pub struct TableSource { pub batch_size: usize, } +impl From for TableSource { + fn from(kv: KeyValueSource) -> Self { + Self { + name: kv.name, + io_source: kv.io_source, + schema: kv.schema, + option: kv.option, + batch_size: Self::default_batch_size(), + } + } +} + impl TableSource { pub fn new(name: impl Into, source: impl Into) -> Self { Self { @@ -497,6 +509,46 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result { } } +#[derive(Deserialize, Clone, Debug, Eq, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct KeyValueSource { + pub name: String, + pub key: String, + pub value: String, + #[serde(flatten)] + pub io_source: TableIoSource, + pub schema: Option, + pub option: Option, +} + +impl KeyValueSource { + pub fn new( + name: impl Into, + source: impl Into, + key: impl Into, + value: impl Into, + ) -> Self { + Self { + name: name.into(), + key: key.into(), + value: value.into(), + io_source: source.into(), + schema: None, + option: None, + } + } + + pub fn with_option(mut self, option: impl Into) -> Self { + self.option = Some(option.into()); + self + } + + pub fn with_schema(mut self, schema: impl Into) -> Self { + self.schema = Some(schema.into()); + self + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/roapi-http/src/api/kv.rs b/roapi-http/src/api/kv.rs new file mode 100644 index 0000000..bca2402 --- /dev/null +++ b/roapi-http/src/api/kv.rs @@ -0,0 +1,15 @@ +use crate::error::ApiErrResp; +use axum::extract::{self, Extension}; +use axum::response::IntoResponse; +use std::sync::Arc; + +use super::HandlerCtx; + +pub async fn get( + Extension(ctx): extract::Extension>, + extract::Path((kv_name, key)): extract::Path<(String, String)>, +) -> Result { + ctx.kv_get(&kv_name, &key) + .await? + .ok_or_else(|| ApiErrResp::not_found(format!("key {} not found", key))) +} diff --git a/roapi-http/src/api/mod.rs b/roapi-http/src/api/mod.rs index 920eec5..1657f74 100644 --- a/roapi-http/src/api/mod.rs +++ b/roapi-http/src/api/mod.rs @@ -28,8 +28,8 @@ impl RawHandlerContext { pub async fn new(config: &Config) -> anyhow::Result { let mut cq = ColumnQ::new(); - if config.tables.is_empty() { - anyhow::bail!("No table found in tables config"); + if config.tables.is_empty() && config.kvstores.is_empty() { + anyhow::bail!("No table nor kvstore found in config"); } for t in config.tables.iter() { @@ -38,6 +38,12 @@ impl RawHandlerContext { info!("registered `{}` as table `{}`", t.io_source, t.name); } + for k in config.kvstores.iter() { + info!("loading `{}` as kv store `{}`", k.io_source, k.name); + cq.load_kv(k.clone()).await?; + info!("registered `{}` as kv store `{}`", k.io_source, k.name); + } + Ok(Self { cq }) } } @@ -69,6 +75,8 @@ pub trait HandlerCtx: Send + Sync + 'static { table_name: &str, params: &HashMap, ) -> Result, QueryError>; + + async fn kv_get(&self, kv_name: &str, key: &str) -> Result, QueryError>; } #[async_trait] @@ -129,6 +137,11 @@ impl HandlerCtx for RawHandlerContext { ) -> Result, QueryError> { self.cq.query_rest_table(table_name, params).await } + + #[inline] + async fn kv_get(&self, kv_name: &str, key: &str) -> Result, QueryError> { + Ok(self.cq.kv_get(kv_name, key)?.cloned()) + } } #[async_trait] @@ -193,6 +206,12 @@ impl HandlerCtx for ConcurrentHandlerContext { let ctx = self.read().await; ctx.cq.query_rest_table(table_name, params).await } + + #[inline] + async fn kv_get(&self, kv_name: &str, key: &str) -> Result, QueryError> { + let ctx = self.read().await; + Ok(ctx.cq.kv_get(kv_name, key)?.cloned()) + } } #[inline] @@ -242,6 +261,7 @@ pub fn encode_record_batches( } pub mod graphql; +pub mod kv; pub mod register; pub mod rest; pub mod routes; diff --git a/roapi-http/src/api/routes.rs b/roapi-http/src/api/routes.rs index 64d3a68..a2d0afd 100644 --- a/roapi-http/src/api/routes.rs +++ b/roapi-http/src/api/routes.rs @@ -9,6 +9,7 @@ pub fn register_app_routes() -> Router { let mut router = Router::new() .route("/api/tables/:table_name", get(api::rest::get_table::)) .route("/api/sql", post(api::sql::post::)) + .route("/api/kv/:kv_name/:key", get(api::kv::get::)) .route("/api/graphql", post(api::graphql::post::)) .route("/api/schema", get(api::schema::schema::)); diff --git a/roapi-http/src/config.rs b/roapi-http/src/config.rs index 5b7a0e7..06d2a91 100644 --- a/roapi-http/src/config.rs +++ b/roapi-http/src/config.rs @@ -3,6 +3,7 @@ use serde_derive::Deserialize; use anyhow::{Context, Result}; use columnq::table::parse_table_uri_arg; +use columnq::table::KeyValueSource; use columnq::table::TableSource; use std::fs; @@ -12,6 +13,8 @@ pub struct Config { pub tables: Vec, #[serde(default)] pub disable_read_only: bool, + #[serde(default)] + pub kvstores: Vec, } fn table_arg() -> clap::Arg<'static> { diff --git a/roapi-http/src/error.rs b/roapi-http/src/error.rs index 2160785..cddf0f3 100644 --- a/roapi-http/src/error.rs +++ b/roapi-http/src/error.rs @@ -25,11 +25,11 @@ where } impl ApiErrResp { - pub fn not_found(message: &str) -> Self { + pub fn not_found(message: impl Into) -> Self { Self { code: http::StatusCode::NOT_FOUND, error: "not_found".to_string(), - message: message.to_string(), + message: message.into(), } } diff --git a/roapi-http/tests/api_test.rs b/roapi-http/tests/api_test.rs index be85848..0179ccc 100644 --- a/roapi-http/tests/api_test.rs +++ b/roapi-http/tests/api_test.rs @@ -10,7 +10,7 @@ use tokio; #[tokio::test] async fn test_schema() -> Result<()> { let json_table = helpers::get_spacex_table(); - let (app, address) = helpers::test_api_app(vec![json_table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![json_table]).await; tokio::spawn(app.run_until_stopped()); let response = helpers::http_get(&format!("{}/api/schema", address), None).await; @@ -24,7 +24,7 @@ async fn test_schema() -> Result<()> { #[tokio::test] async fn test_uk_cities_sql_post() -> Result<()> { let table = helpers::get_uk_cities_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); let response = helpers::http_post( @@ -51,7 +51,7 @@ async fn test_uk_cities_sql_post() -> Result<()> { #[tokio::test] async fn test_sql_invalid_post() -> Result<()> { let table = helpers::get_uk_cities_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); let response = helpers::http_post(&format!("{}/api/sql", address), "SELECT city FROM").await; @@ -72,7 +72,7 @@ async fn test_sql_invalid_post() -> Result<()> { #[tokio::test] async fn test_ubuntu_ami_sql_post() -> Result<()> { let table = helpers::get_ubuntu_ami_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); let response = helpers::http_post( @@ -99,7 +99,7 @@ async fn test_ubuntu_ami_sql_post() -> Result<()> { #[tokio::test] async fn test_rest_get() -> Result<()> { let table = helpers::get_ubuntu_ami_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); let accept_headers = vec![ None, @@ -146,7 +146,7 @@ async fn test_rest_get() -> Result<()> { #[tokio::test] async fn test_graphql_post_query_op() -> Result<()> { let table = helpers::get_ubuntu_ami_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); let response = helpers::http_post( @@ -192,7 +192,7 @@ async fn test_graphql_post_query_op() -> Result<()> { #[tokio::test] async fn test_graphql_post_selection() -> Result<()> { let table = helpers::get_ubuntu_ami_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); let response = helpers::http_post( @@ -236,7 +236,7 @@ async fn test_graphql_post_selection() -> Result<()> { #[tokio::test] async fn test_http2() -> Result<()> { let table = helpers::get_uk_cities_table(); - let (app, address) = helpers::test_api_app(vec![table]).await; + let (app, address) = helpers::test_api_app_with_tables(vec![table]).await; tokio::spawn(app.run_until_stopped()); // ~ % curl -sI --http2-prior-knowledge localhost:8080/api/schema -o/dev/null -w '%{http_version}\n' @@ -263,3 +263,23 @@ async fn test_http2() -> Result<()> { assert_eq!(http_version, two); Ok(()) } + +#[tokio::test] +async fn test_kvstore_get() -> Result<()> { + let store = helpers::get_spacex_launch_name_kvstore(); + let (app, address) = helpers::test_api_app_with_kvstores(vec![store]).await; + tokio::spawn(app.run_until_stopped()); + + let response = helpers::http_get( + &format!( + "{}/api/kv/spacex_launch_name/600f9a8d8f798e2a4d5f979e", + address + ), + None, + ) + .await; + + assert_eq!(response.status(), 200); + assert_eq!(response.text().await?, "Starlink-21 (v1.0)"); + Ok(()) +} diff --git a/roapi-http/tests/helpers.rs b/roapi-http/tests/helpers.rs index 1abd44c..30f4b11 100644 --- a/roapi-http/tests/helpers.rs +++ b/roapi-http/tests/helpers.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use columnq::datafusion::arrow; -use columnq::table::{TableColumn, TableLoadOption, TableSchema, TableSource}; +use columnq::table::{KeyValueSource, TableColumn, TableLoadOption, TableSchema, TableSource}; use roapi_http::config::Config; use roapi_http::startup::Application; @@ -12,11 +12,23 @@ pub fn test_data_path(relative_path: &str) -> String { d.to_string_lossy().to_string() } -pub async fn test_api_app(tables: Vec) -> (Application, String) { +pub async fn test_api_app_with_tables(tables: Vec) -> (Application, String) { + test_api_app(tables, vec![]).await +} + +pub async fn test_api_app_with_kvstores(kvstores: Vec) -> (Application, String) { + test_api_app(vec![], kvstores).await +} + +pub async fn test_api_app( + tables: Vec, + kvstores: Vec, +) -> (Application, String) { let config = Config { addr: "localhost:0".to_string().into(), - tables: tables, + tables, disable_read_only: false, + kvstores, }; let app = Application::build(config) @@ -110,3 +122,12 @@ pub fn get_ubuntu_ami_table() -> TableSource { ], }) } + +pub fn get_spacex_launch_name_kvstore() -> KeyValueSource { + KeyValueSource::new( + "spacex_launch_name", + test_data_path("spacex_launches.json"), + "id", + "name", + ) +}