add kvstore support (#161)

This commit is contained in:
QP Hou 2022-04-02 19:46:58 -07:00 committed by GitHub
parent 5d41a28417
commit 49aeaf1d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 257 additions and 17 deletions

View File

@ -205,6 +205,28 @@ To query tables using a subset of standard SQL, send the query through `POST`
request to `/api/sql` endpoint. This is the only query interface that supports
table joins.
### Key value lookup
You can pick two columns from a table to use a key and value to create a quick
keyvalue store API by adding the following lines to the config:
```yaml
kvstores:
- name: "launch_name"
uri: "test_data/spacex_launches.json"
key: id
value: name
```
Key value lookup can be done through simple HTTP GET requests:
```bash
curl -v localhost:8080/api/kv/launch_name/600f9a8d8f798e2a4d5f979e
Starlink-21 (v1.0)%
```
## Features
Query layer:
@ -220,6 +242,7 @@ Query layer:
- [ ] gRPC
- [ ] MySQL
- [ ] Postgres
- [x] Key value lookup
Response serialization:

View File

@ -1,16 +1,22 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::array::as_string_array;
use datafusion::arrow::array::StringArray;
pub use datafusion::execution::context::ExecutionConfig;
use datafusion::execution::context::ExecutionContext;
use datafusion::physical_plan::collect;
use crate::error::{ColumnQError, QueryError};
use crate::query;
use crate::table::{self, TableSource};
use crate::table::{self, KeyValueSource, TableSource};
pub struct ColumnQ {
dfctx: ExecutionContext,
schema_map: HashMap<String, arrow::datatypes::SchemaRef>,
kv_catalog: HashMap<String, Arc<HashMap<String, String>>>,
}
impl ColumnQ {
@ -21,7 +27,11 @@ impl ColumnQ {
pub fn new_with_config(config: ExecutionConfig) -> Self {
let dfctx = ExecutionContext::with_config(config);
let schema_map = HashMap::<String, arrow::datatypes::SchemaRef>::new();
Self { dfctx, schema_map }
Self {
dfctx,
schema_map,
kv_catalog: HashMap::new(),
}
}
pub async fn load_table(&mut self, t: &TableSource) -> Result<(), ColumnQError> {
@ -33,6 +43,62 @@ impl ColumnQ {
Ok(())
}
pub async fn load_kv(&mut self, kv: KeyValueSource) -> Result<(), ColumnQError> {
use datafusion::arrow::datatypes::DataType;
let kv_entry = self.kv_catalog.entry(kv.name.clone());
let (key, value) = (kv.key.clone(), kv.value.clone());
let table = table::load(&kv.into()).await?;
let schema = table.schema();
let key_schema_idx = schema.index_of(&key)?;
if schema.field(key_schema_idx).data_type() != &DataType::Utf8 {
return Err(ColumnQError::invalid_kv_key_type());
}
let val_schema_idx = schema.index_of(&value)?;
let projections = Some(vec![key_schema_idx, val_schema_idx]);
let filters = &[];
let exec_plan = table.scan(&projections, filters, None).await?;
let batches = collect(exec_plan, self.dfctx.runtime_env()).await?;
let mut kv = HashMap::new();
for batch in batches {
let col_key = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(ColumnQError::invalid_kv_key_type)?;
// TODO: take ownership of array data
let key_iter = col_key.iter();
let col_val = batch.column(1);
match col_val.data_type() {
DataType::Utf8 => {
let val_iter = as_string_array(col_val).iter();
key_iter
.zip(val_iter)
.for_each(|(key, val): (Option<&str>, Option<&str>)| {
// TODO: support null as value? error out on null?
if let (Some(key), Some(val)) = (key, val) {
kv.insert(key.to_string(), val.to_string());
}
});
}
other => {
todo!("unsupported type: {}", other);
}
}
}
match kv_entry {
Entry::Occupied(mut entry) => {
entry.insert(Arc::new(kv));
}
Entry::Vacant(entry) => {
entry.insert(Arc::new(kv));
}
}
Ok(())
}
pub fn schema_map(&self) -> &HashMap<String, arrow::datatypes::SchemaRef> {
&self.schema_map
}
@ -58,6 +124,14 @@ impl ColumnQ {
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
query::rest::query_table(&self.dfctx, table_name, params).await
}
pub fn kv_get(&self, kv_name: &str, key: &str) -> Result<Option<&String>, QueryError> {
let map = self
.kv_catalog
.get(kv_name)
.ok_or_else(|| QueryError::invalid_kv_name(kv_name))?;
Ok(map.get(key))
}
}
impl Default for ColumnQ {

View File

@ -87,6 +87,10 @@ impl ColumnQError {
pub fn s3_obj_missing_key() -> Self {
Self::S3Store("Missing key in S3 object list item".to_string())
}
pub fn invalid_kv_key_type() -> Self {
Self::Generic("keyvalue store key datatype should be a string".to_string())
}
}
impl From<URIReferenceError> for ColumnQError {
@ -156,4 +160,11 @@ impl QueryError {
message: format!("Failed to load table {}: {}", table_name, error),
}
}
pub fn invalid_kv_name(kv_name: &str) -> Self {
Self {
error: "invalid_kv_name".to_string(),
message: format!("keyvalue store name `{}` doesn't exist", kv_name),
}
}
}

View File

@ -300,6 +300,18 @@ pub struct TableSource {
pub batch_size: usize,
}
impl From<KeyValueSource> for TableSource {
fn from(kv: KeyValueSource) -> Self {
Self {
name: kv.name,
io_source: kv.io_source,
schema: kv.schema,
option: kv.option,
batch_size: Self::default_batch_size(),
}
}
}
impl TableSource {
pub fn new(name: impl Into<String>, source: impl Into<TableIoSource>) -> Self {
Self {
@ -497,6 +509,46 @@ pub fn parse_table_uri_arg(uri_arg: &str) -> Result<TableSource, ColumnQError> {
}
}
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct KeyValueSource {
pub name: String,
pub key: String,
pub value: String,
#[serde(flatten)]
pub io_source: TableIoSource,
pub schema: Option<TableSchema>,
pub option: Option<TableLoadOption>,
}
impl KeyValueSource {
pub fn new(
name: impl Into<String>,
source: impl Into<TableIoSource>,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
Self {
name: name.into(),
key: key.into(),
value: value.into(),
io_source: source.into(),
schema: None,
option: None,
}
}
pub fn with_option(mut self, option: impl Into<TableLoadOption>) -> Self {
self.option = Some(option.into());
self
}
pub fn with_schema(mut self, schema: impl Into<TableSchema>) -> Self {
self.schema = Some(schema.into());
self
}
}
#[cfg(test)]
mod tests {
use super::*;

15
roapi-http/src/api/kv.rs Normal file
View File

@ -0,0 +1,15 @@
use crate::error::ApiErrResp;
use axum::extract::{self, Extension};
use axum::response::IntoResponse;
use std::sync::Arc;
use super::HandlerCtx;
pub async fn get<H: HandlerCtx>(
Extension(ctx): extract::Extension<Arc<H>>,
extract::Path((kv_name, key)): extract::Path<(String, String)>,
) -> Result<impl IntoResponse, ApiErrResp> {
ctx.kv_get(&kv_name, &key)
.await?
.ok_or_else(|| ApiErrResp::not_found(format!("key {} not found", key)))
}

View File

@ -28,8 +28,8 @@ impl RawHandlerContext {
pub async fn new(config: &Config) -> anyhow::Result<Self> {
let mut cq = ColumnQ::new();
if config.tables.is_empty() {
anyhow::bail!("No table found in tables config");
if config.tables.is_empty() && config.kvstores.is_empty() {
anyhow::bail!("No table nor kvstore found in config");
}
for t in config.tables.iter() {
@ -38,6 +38,12 @@ impl RawHandlerContext {
info!("registered `{}` as table `{}`", t.io_source, t.name);
}
for k in config.kvstores.iter() {
info!("loading `{}` as kv store `{}`", k.io_source, k.name);
cq.load_kv(k.clone()).await?;
info!("registered `{}` as kv store `{}`", k.io_source, k.name);
}
Ok(Self { cq })
}
}
@ -69,6 +75,8 @@ pub trait HandlerCtx: Send + Sync + 'static {
table_name: &str,
params: &HashMap<String, String>,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError>;
async fn kv_get(&self, kv_name: &str, key: &str) -> Result<Option<String>, QueryError>;
}
#[async_trait]
@ -129,6 +137,11 @@ impl HandlerCtx for RawHandlerContext {
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
self.cq.query_rest_table(table_name, params).await
}
#[inline]
async fn kv_get(&self, kv_name: &str, key: &str) -> Result<Option<String>, QueryError> {
Ok(self.cq.kv_get(kv_name, key)?.cloned())
}
}
#[async_trait]
@ -193,6 +206,12 @@ impl HandlerCtx for ConcurrentHandlerContext {
let ctx = self.read().await;
ctx.cq.query_rest_table(table_name, params).await
}
#[inline]
async fn kv_get(&self, kv_name: &str, key: &str) -> Result<Option<String>, QueryError> {
let ctx = self.read().await;
Ok(ctx.cq.kv_get(kv_name, key)?.cloned())
}
}
#[inline]
@ -242,6 +261,7 @@ pub fn encode_record_batches(
}
pub mod graphql;
pub mod kv;
pub mod register;
pub mod rest;
pub mod routes;

View File

@ -9,6 +9,7 @@ pub fn register_app_routes<H: HandlerCtx>() -> Router {
let mut router = Router::new()
.route("/api/tables/:table_name", get(api::rest::get_table::<H>))
.route("/api/sql", post(api::sql::post::<H>))
.route("/api/kv/:kv_name/:key", get(api::kv::get::<H>))
.route("/api/graphql", post(api::graphql::post::<H>))
.route("/api/schema", get(api::schema::schema::<H>));

View File

@ -3,6 +3,7 @@ use serde_derive::Deserialize;
use anyhow::{Context, Result};
use columnq::table::parse_table_uri_arg;
use columnq::table::KeyValueSource;
use columnq::table::TableSource;
use std::fs;
@ -12,6 +13,8 @@ pub struct Config {
pub tables: Vec<TableSource>,
#[serde(default)]
pub disable_read_only: bool,
#[serde(default)]
pub kvstores: Vec<KeyValueSource>,
}
fn table_arg() -> clap::Arg<'static> {

View File

@ -25,11 +25,11 @@ where
}
impl ApiErrResp {
pub fn not_found(message: &str) -> Self {
pub fn not_found(message: impl Into<String>) -> Self {
Self {
code: http::StatusCode::NOT_FOUND,
error: "not_found".to_string(),
message: message.to_string(),
message: message.into(),
}
}

View File

@ -10,7 +10,7 @@ use tokio;
#[tokio::test]
async fn test_schema() -> Result<()> {
let json_table = helpers::get_spacex_table();
let (app, address) = helpers::test_api_app(vec![json_table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![json_table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_get(&format!("{}/api/schema", address), None).await;
@ -24,7 +24,7 @@ async fn test_schema() -> Result<()> {
#[tokio::test]
async fn test_uk_cities_sql_post() -> Result<()> {
let table = helpers::get_uk_cities_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(
@ -51,7 +51,7 @@ async fn test_uk_cities_sql_post() -> Result<()> {
#[tokio::test]
async fn test_sql_invalid_post() -> Result<()> {
let table = helpers::get_uk_cities_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(&format!("{}/api/sql", address), "SELECT city FROM").await;
@ -72,7 +72,7 @@ async fn test_sql_invalid_post() -> Result<()> {
#[tokio::test]
async fn test_ubuntu_ami_sql_post() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(
@ -99,7 +99,7 @@ async fn test_ubuntu_ami_sql_post() -> Result<()> {
#[tokio::test]
async fn test_rest_get() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let accept_headers = vec![
None,
@ -146,7 +146,7 @@ async fn test_rest_get() -> Result<()> {
#[tokio::test]
async fn test_graphql_post_query_op() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(
@ -192,7 +192,7 @@ async fn test_graphql_post_query_op() -> Result<()> {
#[tokio::test]
async fn test_graphql_post_selection() -> Result<()> {
let table = helpers::get_ubuntu_ami_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_post(
@ -236,7 +236,7 @@ async fn test_graphql_post_selection() -> Result<()> {
#[tokio::test]
async fn test_http2() -> Result<()> {
let table = helpers::get_uk_cities_table();
let (app, address) = helpers::test_api_app(vec![table]).await;
let (app, address) = helpers::test_api_app_with_tables(vec![table]).await;
tokio::spawn(app.run_until_stopped());
// ~ % curl -sI --http2-prior-knowledge localhost:8080/api/schema -o/dev/null -w '%{http_version}\n'
@ -263,3 +263,23 @@ async fn test_http2() -> Result<()> {
assert_eq!(http_version, two);
Ok(())
}
#[tokio::test]
async fn test_kvstore_get() -> Result<()> {
let store = helpers::get_spacex_launch_name_kvstore();
let (app, address) = helpers::test_api_app_with_kvstores(vec![store]).await;
tokio::spawn(app.run_until_stopped());
let response = helpers::http_get(
&format!(
"{}/api/kv/spacex_launch_name/600f9a8d8f798e2a4d5f979e",
address
),
None,
)
.await;
assert_eq!(response.status(), 200);
assert_eq!(response.text().await?, "Starlink-21 (v1.0)");
Ok(())
}

View File

@ -1,7 +1,7 @@
use std::path::PathBuf;
use columnq::datafusion::arrow;
use columnq::table::{TableColumn, TableLoadOption, TableSchema, TableSource};
use columnq::table::{KeyValueSource, TableColumn, TableLoadOption, TableSchema, TableSource};
use roapi_http::config::Config;
use roapi_http::startup::Application;
@ -12,11 +12,23 @@ pub fn test_data_path(relative_path: &str) -> String {
d.to_string_lossy().to_string()
}
pub async fn test_api_app(tables: Vec<TableSource>) -> (Application, String) {
pub async fn test_api_app_with_tables(tables: Vec<TableSource>) -> (Application, String) {
test_api_app(tables, vec![]).await
}
pub async fn test_api_app_with_kvstores(kvstores: Vec<KeyValueSource>) -> (Application, String) {
test_api_app(vec![], kvstores).await
}
pub async fn test_api_app(
tables: Vec<TableSource>,
kvstores: Vec<KeyValueSource>,
) -> (Application, String) {
let config = Config {
addr: "localhost:0".to_string().into(),
tables: tables,
tables,
disable_read_only: false,
kvstores,
};
let app = Application::build(config)
@ -110,3 +122,12 @@ pub fn get_ubuntu_ami_table() -> TableSource {
],
})
}
pub fn get_spacex_launch_name_kvstore() -> KeyValueSource {
KeyValueSource::new(
"spacex_launch_name",
test_data_path("spacex_launches.json"),
"id",
"name",
)
}