improve s3 credential handling

This commit is contained in:
Qingping Hou 2021-03-20 22:18:48 -07:00 committed by QP Hou
parent 879b2b09a4
commit b58ff2fc62
4 changed files with 157 additions and 131 deletions

171
Cargo.lock generated
View File

@ -69,7 +69,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"serde_urlencoded",
"serde_urlencoded 0.7.0",
"sha-1",
"smallvec",
"time 0.2.25",
@ -199,7 +199,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"serde_urlencoded",
"serde_urlencoded 0.7.0",
"smallvec",
"socket2",
"time 0.2.25",
@ -294,7 +294,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=f7cf15749f2df907994f48ef0bfbed3001bf145e#f7cf15749f2df907994f48ef0bfbed3001bf145e"
source = "git+https://github.com/houqp/arrow.git?rev=cfec7bc607b9b564eceb352e54b21fa005a3d606#cfec7bc607b9b564eceb352e54b21fa005a3d606"
dependencies = [
"cfg_aliases",
"chrono",
@ -371,7 +371,7 @@ dependencies = [
"rand 0.8.3",
"serde",
"serde_json",
"serde_urlencoded",
"serde_urlencoded 0.7.0",
]
[[package]]
@ -604,7 +604,9 @@ dependencies = [
"regex",
"reqwest",
"rusoto_core",
"rusoto_credential",
"rusoto_s3",
"rusoto_sts",
"serde",
"serde_derive",
"serde_json",
@ -684,64 +686,6 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "crossbeam"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01a6eb3daaafa260f6fc94c3a6c36390abc2080e38e3e34ced87393fb77d80"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.3"
@ -797,16 +741,16 @@ dependencies = [
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=f7cf15749f2df907994f48ef0bfbed3001bf145e#f7cf15749f2df907994f48ef0bfbed3001bf145e"
source = "git+https://github.com/houqp/arrow.git?rev=cfec7bc607b9b564eceb352e54b21fa005a3d606#cfec7bc607b9b564eceb352e54b21fa005a3d606"
dependencies = [
"ahash 0.7.2",
"arrow",
"async-trait",
"chrono",
"clap 2.33.3",
"crossbeam",
"futures",
"hashbrown",
"lazy_static",
"log",
"md-5",
"num_cpus",
@ -814,17 +758,19 @@ dependencies = [
"parquet",
"paste",
"pin-project-lite",
"regex",
"rustyline",
"sha2",
"sqlparser 0.8.0",
"tokio",
"tokio-stream",
"unicode-segmentation",
]
[[package]]
name = "deltalake"
version = "0.2.1"
source = "git+https://github.com/delta-io/delta-rs.git?rev=6c8dad8c8fcd5420710dea723541a6c69251be17#6c8dad8c8fcd5420710dea723541a6c69251be17"
source = "git+https://github.com/delta-io/delta-rs.git?rev=0d8b23fb3401171f33c2f299335418abff4df606#0d8b23fb3401171f33c2f299335418abff4df606"
dependencies = [
"anyhow",
"arrow",
@ -834,13 +780,17 @@ dependencies = [
"chrono",
"clap 3.0.0-beta.2",
"env_logger",
"errno",
"futures",
"lazy_static",
"libc",
"log",
"parquet",
"regex",
"rusoto_core",
"rusoto_credential",
"rusoto_s3",
"rusoto_sts",
"serde",
"serde_json",
"thiserror",
@ -947,6 +897,27 @@ dependencies = [
"termcolor",
]
[[package]]
name = "errno"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa68f2fb9cae9d37c9b2b3584aba698a2e97f72d7aef7b9f7aa71d8b54ce46fe"
dependencies = [
"errno-dragonfly",
"libc",
"winapi",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14ca354e36190500e1e1fb267c647932382b54053c50b14970856c0b00a35067"
dependencies = [
"gcc",
"libc",
]
[[package]]
name = "flatbuffers"
version = "0.8.3"
@ -1109,6 +1080,12 @@ dependencies = [
"slab",
]
[[package]]
name = "gcc"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
[[package]]
name = "generic-array"
version = "0.14.4"
@ -1482,15 +1459,6 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "memoffset"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87"
dependencies = [
"autocfg",
]
[[package]]
name = "mime"
version = "0.3.16"
@ -1669,15 +1637,15 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.32"
version = "0.10.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70"
checksum = "a61075b62a23fef5a29815de7536d940aa35ce96d18ce0cc5076272db678a577"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
"foreign-types",
"lazy_static",
"libc",
"once_cell",
"openssl-sys",
]
@ -1689,9 +1657,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
[[package]]
name = "openssl-sys"
version = "0.9.60"
version = "0.9.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6"
checksum = "313752393519e876837e09e1fa183ddef0be7735868dced3196f4472d536277f"
dependencies = [
"autocfg",
"cc",
@ -1762,7 +1730,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/houqp/arrow.git?rev=f7cf15749f2df907994f48ef0bfbed3001bf145e#f7cf15749f2df907994f48ef0bfbed3001bf145e"
source = "git+https://github.com/houqp/arrow.git?rev=cfec7bc607b9b564eceb352e54b21fa005a3d606#cfec7bc607b9b564eceb352e54b21fa005a3d606"
dependencies = [
"arrow",
"base64 0.12.3",
@ -1789,9 +1757,9 @@ dependencies = [
[[package]]
name = "paste"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d65c4d95931acda4498f675e332fcbdc9a06705cd07086c510e9b6009cd1c1"
checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58"
[[package]]
name = "percent-encoding"
@ -2067,9 +2035,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.4.4"
version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54fd1046a3107eb58f42de31d656fee6853e5d276c455fd943742dce89fc3dd3"
checksum = "957056ecddbeba1b26965114e191d2e8589ce74db242b6ea25fc4062427a5c19"
dependencies = [
"aho-corasick",
"memchr",
@ -2125,7 +2093,7 @@ dependencies = [
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"serde_urlencoded 0.7.0",
"tokio",
"tokio-native-tls",
"url",
@ -2152,7 +2120,7 @@ dependencies = [
[[package]]
name = "roapi-http"
version = "0.1.1"
version = "0.1.2"
dependencies = [
"actix-cors",
"actix-http",
@ -2253,6 +2221,21 @@ dependencies = [
"tokio",
]
[[package]]
name = "rusoto_sts"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f93005e0c3b9e40a424b50ca71886d2445cc19bb6cdac3ac84c2daff482eb59"
dependencies = [
"async-trait",
"bytes",
"chrono",
"futures",
"rusoto_core",
"serde_urlencoded 0.6.1",
"xml-rs",
]
[[package]]
name = "rust-argon2"
version = "0.8.3"
@ -2428,6 +2411,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"
dependencies = [
"dtoa",
"itoa",
"serde",
"url",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.0"
@ -2636,9 +2631,9 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]]
name = "syn"
version = "1.0.63"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd9bc7ccc2688b3344c2f48b9b546648b25ce0b20fc717ee7fa7981a8ca9717"
checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f"
dependencies = [
"proc-macro2",
"quote",

View File

@ -34,6 +34,8 @@ futures = "0.3"
# S3
rusoto_core = { version = "0.46" }
rusoto_s3 = { version = "0.46" }
rusoto_credential = { version = "0.46" }
rusoto_sts = { version = "0.46" }
deltalake = { version = "0", features = ["s3"] }

View File

@ -28,6 +28,33 @@ pub fn parse_uri<'a>(path: &'a str) -> Result<(&'a str, &'a str), ColumnQError>
Ok((bucket, key))
}
fn new_s3_client() -> Result<rusoto_s3::S3Client, ColumnQError> {
let region = rusoto_core::Region::default();
let dispatcher = rusoto_core::HttpClient::new()
.map_err(|_| ColumnQError::S3Store("Failed to create request dispatcher".to_string()))?;
let client = match std::env::var("AWS_WEB_IDENTITY_TOKEN_FILE") {
Ok(_) => {
let provider = rusoto_sts::WebIdentityProvider::from_k8s_env();
let provider =
rusoto_credential::AutoRefreshingProvider::new(provider).map_err(|e| {
ColumnQError::S3Store(format!(
"Failed to retrieve S3 credentials with message: {}",
e.message
))
})?;
rusoto_s3::S3Client::new_with(dispatcher, provider, region)
}
Err(_) => rusoto_s3::S3Client::new_with(
dispatcher,
rusoto_core::credential::ChainProvider::new(),
region,
),
};
Ok(client)
}
enum ContinuationToken {
Value(Option<String>),
End,
@ -36,7 +63,7 @@ enum ContinuationToken {
fn list_objects<'a>(
bucket: &'a str,
key: &'a str,
) -> impl futures::Stream<Item = Result<String, ColumnQError>> + 'a {
) -> Result<impl futures::Stream<Item = Result<String, ColumnQError>> + 'a, ColumnQError> {
struct S3ListState<'a> {
bucket: &'a str,
key: String,
@ -58,51 +85,53 @@ fn list_objects<'a>(
let init_state = S3ListState {
bucket,
key,
client: rusoto_s3::S3Client::new(rusoto_core::Region::default()),
client: new_s3_client()?,
continuation_token: ContinuationToken::Value(None),
obj_iter: Vec::new().into_iter(),
};
futures::stream::unfold(init_state, |mut state| async move {
match state.obj_iter.next() {
Some(obj) => Some((obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state)),
None => match &state.continuation_token {
ContinuationToken::End => None, // terminate stream
ContinuationToken::Value(v) => {
let list_req = rusoto_s3::ListObjectsV2Request {
bucket: state.bucket.to_string(),
prefix: Some(state.key.clone()),
start_after: Some(state.key.clone()),
continuation_token: v.clone(),
..Default::default()
};
Ok(futures::stream::unfold(
init_state,
|mut state| async move {
match state.obj_iter.next() {
Some(obj) => Some((obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state)),
None => match &state.continuation_token {
ContinuationToken::End => None, // terminate stream
ContinuationToken::Value(v) => {
let list_req = rusoto_s3::ListObjectsV2Request {
bucket: state.bucket.to_string(),
prefix: Some(state.key.clone()),
start_after: Some(state.key.clone()),
continuation_token: v.clone(),
..Default::default()
};
let list_resp = match state.client.list_objects_v2(list_req).await {
Ok(res) => res,
Err(e) => {
return Some((
Err(ColumnQError::S3Store(format!(
"Failed to list bucket: {}",
e
))),
state,
));
}
};
state.continuation_token = list_resp
.next_continuation_token
.map(|t| ContinuationToken::Value(Some(t)))
.unwrap_or(ContinuationToken::End);
let list_resp = match state.client.list_objects_v2(list_req).await {
Ok(res) => res,
Err(e) => {
return Some((
Err(ColumnQError::S3Store(format!(
"Failed to list bucket: {}",
e
))),
state,
));
}
};
state.continuation_token = list_resp
.next_continuation_token
.map(|t| ContinuationToken::Value(Some(t)))
.unwrap_or(ContinuationToken::End);
state.obj_iter = list_resp.contents.unwrap_or_else(Vec::new).into_iter();
state
.obj_iter
.next()
.map(|obj| (obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state))
}
},
}
})
state.obj_iter = list_resp.contents.unwrap_or_else(Vec::new).into_iter();
state.obj_iter.next().map(|obj| {
(obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state)
})
}
},
}
},
))
}
pub async fn partition_key_to_reader(
@ -152,7 +181,7 @@ where
I: Iterator<Item = &'a str>,
F: FnMut(std::io::Cursor<Vec<u8>>) -> Result<T, ColumnQError>,
{
let client = rusoto_s3::S3Client::new(rusoto_core::Region::default());
let client = new_s3_client()?;
let mut partitions = vec![];
for s3_path in path_iter {
@ -184,7 +213,7 @@ where
}
Err(_) => {
// fallback to directory listing
let mut key_stream = Box::pin(list_objects(bucket, key));
let mut key_stream = Box::pin(list_objects(bucket, key)?);
// TODO: fetch s3 objects concurrently
while let Some(item) = key_stream.next().await {
let partition_key = item?;

View File

@ -1,6 +1,6 @@
[package]
name = "roapi-http"
version = "0.1.1"
version = "0.1.2"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
edition = "2018"