Add a read-only flag to control table update action. (#156)

* Add a disable-read-only flag to control table update action.

Co-authored-by: zemel leong <zemel.leong@gmail.com>
Co-authored-by: Qingping Hou <dave2008713@gmail.com>
This commit is contained in:
zemel leong 2022-03-20 02:34:36 +08:00 committed by GitHub
parent 86b8cbf722
commit a17887bd75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 234 additions and 63 deletions

1
Cargo.lock generated
View File

@ -2236,6 +2236,7 @@ version = "0.5.4"
dependencies = [
"anyhow",
"async-process",
"async-trait",
"axum",
"clap",
"columnq",

View File

@ -27,6 +27,7 @@ tower-http = { version = "0", features = ["cors"] }
tower-layer = "0"
tracing = "0"
pin-project = "1"
async-trait = "0"
env_logger = "0"
log = "0"

View File

@ -2,20 +2,21 @@ use axum::body::Bytes;
use axum::extract;
use axum::http::header::HeaderMap;
use axum::response::IntoResponse;
use tokio::sync::RwLock;
use std::sync::Arc;
use crate::api::{encode_record_batches, encode_type_from_hdr, HandlerContext};
use crate::api::{encode_record_batches, encode_type_from_hdr};
use crate::error::ApiErrResp;
pub async fn post(
state: extract::Extension<Arc<RwLock<HandlerContext>>>,
use super::HandlerCtx;
pub async fn post<H: HandlerCtx>(
state: extract::Extension<Arc<H>>,
headers: HeaderMap,
body: Bytes,
) -> Result<impl IntoResponse, ApiErrResp> {
let ctx = state.0.read().await;
let ctx = state.0;
let encode_type = encode_type_from_hdr(headers);
let graphq = std::str::from_utf8(&body).map_err(ApiErrResp::read_query)?;
let batches = ctx.cq.query_graphql(graphq).await?;
let batches = ctx.query_graphql(graphq).await?;
encode_record_batches(encode_type, &batches)
}

View File

@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use async_trait::async_trait;
use axum::body::Body;
use axum::http::header;
use axum::http::Response;
@ -7,18 +9,22 @@ use axum::response::IntoResponse;
use columnq::datafusion::arrow;
use columnq::encoding;
use columnq::encoding::ContentType;
use columnq::error::ColumnQError;
use columnq::error::QueryError;
use columnq::table::TableSource;
use columnq::ColumnQ;
use log::info;
use tokio::sync::RwLock;
use crate::config::Config;
use crate::error::ApiErrResp;
pub struct HandlerContext {
pub struct RawHandlerContext {
pub cq: ColumnQ,
// TODO: store pre serialized schema in handler context
}
impl HandlerContext {
impl RawHandlerContext {
pub async fn new(config: &Config) -> anyhow::Result<Self> {
let mut cq = ColumnQ::new();
@ -36,6 +42,123 @@ impl HandlerContext {
}
}
pub type ConcurrentHandlerContext = RwLock<RawHandlerContext>;
#[async_trait]
pub trait HandlerCtx: Send + Sync + 'static {
fn read_only_mode() -> bool;
async fn load_table(&self, table: &TableSource) -> Result<(), ColumnQError>;
// FIXME: avoid clone
async fn schema_map(&self) -> HashMap<String, arrow::datatypes::SchemaRef>;
async fn query_graphql(
&self,
query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError>;
async fn query_sql(
&self,
query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError>;
async fn query_rest_table(
&self,
table_name: &str,
params: &HashMap<String, String>,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError>;
}
#[async_trait]
impl HandlerCtx for RawHandlerContext {
#[inline]
fn read_only_mode() -> bool {
true
}
#[inline]
async fn load_table(&self, _table: &TableSource) -> Result<(), ColumnQError> {
Err(ColumnQError::Generic(
"Table update not supported in read only mode".to_string(),
))
}
#[inline]
async fn schema_map(&self) -> HashMap<String, arrow::datatypes::SchemaRef> {
self.cq.schema_map().clone()
}
#[inline]
async fn query_graphql(
&self,
query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
self.cq.query_graphql(query).await
}
#[inline]
async fn query_sql(
&self,
query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
self.cq.query_sql(query).await
}
#[inline]
async fn query_rest_table(
&self,
table_name: &str,
params: &HashMap<String, String>,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
self.cq.query_rest_table(table_name, params).await
}
}
#[async_trait]
impl HandlerCtx for ConcurrentHandlerContext {
#[inline]
fn read_only_mode() -> bool {
false
}
#[inline]
async fn load_table(&self, table: &TableSource) -> Result<(), ColumnQError> {
let mut ctx = self.write().await;
ctx.cq.load_table(table).await
}
#[inline]
async fn schema_map(&self) -> HashMap<String, arrow::datatypes::SchemaRef> {
let ctx = self.read().await;
ctx.cq.schema_map().clone()
}
#[inline]
async fn query_graphql(
&self,
query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
let ctx = self.read().await;
ctx.cq.query_graphql(query).await
}
#[inline]
async fn query_sql(
&self,
query: &str,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
let ctx = self.read().await;
ctx.cq.query_sql(query).await
}
#[inline]
async fn query_rest_table(
&self,
table_name: &str,
params: &HashMap<String, String>,
) -> Result<Vec<arrow::record_batch::RecordBatch>, QueryError> {
let ctx = self.read().await;
ctx.cq.query_rest_table(table_name, params).await
}
}
#[inline]
pub fn bytes_to_resp(bytes: Vec<u8>, content_type: &'static str) -> impl IntoResponse {
let mut res = Response::new(Body::from(bytes));
@ -83,10 +206,10 @@ pub fn encode_record_batches(
}
pub mod graphql;
pub mod register;
pub mod rest;
pub mod routes;
pub mod schema;
pub mod sql;
pub mod register;
pub use routes::register_app_routes;

View File

@ -4,11 +4,11 @@ use axum::extract::{Extension, Json};
use columnq::{error::ColumnQError, table::TableSource};
use log::info;
use serde::Deserialize;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;
use crate::error::ApiErrResp;
use super::HandlerContext;
use super::HandlerCtx;
#[derive(Debug, Deserialize)]
pub struct SourceConfig {
@ -17,19 +17,17 @@ pub struct SourceConfig {
pub uri: Option<String>,
}
pub async fn register_table(
Extension(state): Extension<Arc<RwLock<HandlerContext>>>,
pub async fn register_table<H: HandlerCtx>(
Extension(ctx): Extension<Arc<H>>,
Extension(tables): Extension<Arc<Mutex<HashMap<String, TableSource>>>>,
Json(body): Json<Vec<SourceConfig>>,
) -> Result<(), ApiErrResp> {
let mut ctx = state.write().await;
let mut tables = tables.lock().await;
for config in body {
if let Some(ref uri) = config.uri {
let t = TableSource::new_with_uri(&config.table_name, uri);
info!("loading `{}` as table `{}`", t.io_source, config.table_name);
ctx.cq
.load_table(&t)
ctx.load_table(&t)
.await
.map_err(ColumnQError::from)
.map_err(ApiErrResp::load_table)?;
@ -40,8 +38,7 @@ pub async fn register_table(
);
} else if let Some(t) = tables.get(&config.table_name) {
info!("Re register table {}", t.name);
ctx.cq
.load_table(t)
ctx.load_table(t)
.await
.map_err(ColumnQError::from)
.map_err(ApiErrResp::load_table)?;
@ -54,3 +51,7 @@ pub async fn register_table(
}
Ok(())
}
pub async fn register_table_read_only() -> Result<(), ApiErrResp> {
Err(ApiErrResp::read_only_mode())
}

View File

@ -1,21 +1,20 @@
use crate::api::HandlerContext;
use crate::api::{encode_record_batches, encode_type_from_hdr};
use crate::error::ApiErrResp;
use axum::extract::{self, Extension};
use axum::http::header::HeaderMap;
use axum::response::IntoResponse;
use tokio::sync::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
pub async fn get_table(
Extension(state): extract::Extension<Arc<RwLock<HandlerContext>>>,
use super::HandlerCtx;
pub async fn get_table<H: HandlerCtx>(
Extension(ctx): extract::Extension<Arc<H>>,
headers: HeaderMap,
extract::Path(table_name): extract::Path<String>,
extract::Query(params): extract::Query<HashMap<String, String>>,
) -> Result<impl IntoResponse, ApiErrResp> {
let ctx = &state.read().await;
let encode_type = encode_type_from_hdr(headers);
let batches = ctx.cq.query_rest_table(&table_name, &params).await?;
let batches = ctx.query_rest_table(&table_name, &params).await?;
encode_record_batches(encode_type, &batches)
}

View File

@ -1,15 +1,22 @@
use crate::api;
use axum::{
routing::{get, post},
Router,
};
pub fn register_app_routes() -> Router {
Router::new()
.route("/api/tables/:table_name", get(api::rest::get_table))
.route("/api/sql", post(api::sql::post))
.route("/api/graphql", post(api::graphql::post))
.route("/api/schema", get(api::schema::schema))
.route("/api/table", post(api::register::register_table))
use crate::api::{self, HandlerCtx};
pub fn register_app_routes<H: HandlerCtx>() -> Router {
let mut router = Router::new()
.route("/api/tables/:table_name", get(api::rest::get_table::<H>))
.route("/api/sql", post(api::sql::post::<H>))
.route("/api/graphql", post(api::graphql::post::<H>))
.route("/api/schema", get(api::schema::schema::<H>));
if H::read_only_mode() {
router = router.route("/api/table", post(api::register::register_table_read_only));
} else {
router = router.route("/api/table", post(api::register::register_table::<H>));
}
router
}

View File

@ -1,29 +1,30 @@
use crate::api::{bytes_to_json_resp, HandlerContext};
use crate::api::bytes_to_json_resp;
use crate::error::ApiErrResp;
use axum::extract;
use axum::response::IntoResponse;
use tokio::sync::RwLock;
use std::sync::Arc;
pub async fn schema(
state: extract::Extension<Arc<RwLock<HandlerContext>>>,
use super::HandlerCtx;
pub async fn schema<H: HandlerCtx>(
state: extract::Extension<Arc<H>>,
) -> Result<impl IntoResponse, ApiErrResp> {
let ctx = state.0.read().await;
let schema = ctx.cq.schema_map();
let ctx = state.0;
let schema = ctx.schema_map().await;
let payload = serde_json::to_vec(&schema)
.map_err(columnq::error::ColumnQError::from)
.map_err(ApiErrResp::json_serialization)?;
Ok(bytes_to_json_resp(payload))
}
pub async fn get_by_table_name(
state: extract::Extension<Arc<RwLock<HandlerContext>>>,
pub async fn get_by_table_name<H: HandlerCtx>(
state: extract::Extension<Arc<H>>,
extract::Path(table_name): extract::Path<String>,
) -> Result<impl IntoResponse, ApiErrResp> {
let ctx = state.0.read().await;
let ctx = state.0;
let payload = serde_json::to_vec(
ctx.cq
.schema_map()
ctx.schema_map()
.await
.get(&table_name)
.ok_or_else(|| ApiErrResp::not_found("invalid table name"))?
.as_ref(),

View File

@ -2,20 +2,21 @@ use axum::body::Bytes;
use axum::extract;
use axum::http::header::HeaderMap;
use axum::response::IntoResponse;
use tokio::sync::RwLock;
use std::sync::Arc;
use crate::api::{encode_record_batches, encode_type_from_hdr, HandlerContext};
use crate::api::{encode_record_batches, encode_type_from_hdr};
use crate::error::ApiErrResp;
pub async fn post(
state: extract::Extension<Arc<RwLock<HandlerContext>>>,
use super::HandlerCtx;
pub async fn post<H: HandlerCtx>(
state: extract::Extension<Arc<H>>,
headers: HeaderMap,
body: Bytes,
) -> Result<impl IntoResponse, ApiErrResp> {
let ctx = state.0.read().await;
let ctx = state.0;
let encode_type = encode_type_from_hdr(headers);
let sql = std::str::from_utf8(&body).map_err(ApiErrResp::read_query)?;
let batches = ctx.cq.query_sql(sql).await?;
let batches = ctx.query_sql(sql).await?;
encode_record_batches(encode_type, &batches)
}

View File

@ -10,6 +10,8 @@ use std::fs;
pub struct Config {
pub addr: Option<String>,
pub tables: Vec<TableSource>,
#[serde(default)]
pub disable_read_only: bool,
}
fn table_arg() -> clap::Arg<'static> {
@ -34,6 +36,15 @@ fn address_arg() -> clap::Arg<'static> {
.short('a')
}
fn read_only_arg() -> clap::Arg<'static> {
clap::Arg::new("disable-read-only")
.help("Start roapi-http in read write mode")
.required(false)
.takes_value(false)
.long("disable-read-only")
.short('d')
}
fn config_arg() -> clap::Arg<'static> {
clap::Arg::new("config")
.help("config file path")
@ -51,7 +62,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
"Create full-fledged APIs for static datasets without writing a single line of code.",
)
.arg_required_else_help(true)
.args(&[address_arg(), config_arg(), table_arg()])
.args(&[address_arg(), config_arg(), read_only_arg(), table_arg()])
.get_matches();
let mut config: Config = match matches.value_of("config") {
@ -74,5 +85,9 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
config.addr = Some(addr.to_string());
}
if matches.is_present("disable-read-only") {
config.disable_read_only = true;
}
Ok(config)
}

View File

@ -89,6 +89,14 @@ impl ApiErrResp {
}
}
pub fn read_only_mode() -> Self {
Self {
code: http::StatusCode::FORBIDDEN,
error: "read_only_mode".to_string(),
message: "Write operation is not allowed in read-only mode".to_string(),
}
}
pub fn load_table(error: ColumnQError) -> Self {
Self {
code: http::StatusCode::INTERNAL_SERVER_ERROR,

View File

@ -1,13 +1,14 @@
use std::collections::HashMap;
use axum::extract::Extension;
use axum::http::Method;
use tokio::sync::{Mutex, RwLock};
use columnq::table::TableSource;
use std::collections::HashMap;
use std::net::TcpListener;
use std::sync::Arc;
use columnq::table::TableSource;
use tokio::sync::{Mutex, RwLock};
use crate::api;
use crate::api::HandlerContext;
use crate::api::ConcurrentHandlerContext;
use crate::api::RawHandlerContext;
use crate::config::Config;
use crate::layers::HttpLoggerLayer;
@ -29,31 +30,42 @@ impl Application {
.unwrap_or_else(|| default_addr.to_string());
let listener = TcpListener::bind(addr)?;
let port = listener.local_addr().unwrap().port();
let server = axum::Server::from_tcp(listener).unwrap();
let handler_ctx = HandlerContext::new(&config)
let handler_ctx = RawHandlerContext::new(&config)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let routes = api::routes::register_app_routes();
let cors = tower_http::cors::CorsLayer::new()
.allow_methods(vec![Method::GET, Method::POST, Method::OPTIONS])
.allow_origin(tower_http::cors::Any)
.allow_credentials(false);
let tables = config.tables.iter()
let tables = config
.tables
.iter()
.map(|t| (t.name.clone(), t.clone()))
.collect::<HashMap<String, TableSource>>();
let mut app = routes
.layer(Extension(Arc::new(RwLock::new(handler_ctx))))
let mut app = if config.disable_read_only {
let ctx_ext = Arc::new(RwLock::new(handler_ctx));
let routes = api::routes::register_app_routes::<ConcurrentHandlerContext>();
routes.layer(Extension(ctx_ext))
} else {
let ctx_ext = Arc::new(handler_ctx);
let routes = api::routes::register_app_routes::<RawHandlerContext>();
routes.layer(Extension(ctx_ext))
};
app = app
.layer(Extension(Arc::new(Mutex::new(tables))))
.layer(cors);
if log::log_enabled!(log::Level::Info) {
// only add logger layer if level >= INFO
app = app.layer(HttpLoggerLayer::new());
}
let server = axum::Server::from_tcp(listener)
.unwrap()
.serve(app.into_make_service());
let server = server.serve(app.into_make_service());
Ok(Self { port, server })
}

View File

@ -16,6 +16,7 @@ pub async fn test_api_app(tables: Vec<TableSource>) -> (Application, String) {
let config = Config {
addr: "localhost:0".to_string().into(),
tables: tables,
disable_read_only: false,
};
let app = Application::build(config)