From b58ff2fc62b50158073177f3b34e09727416333f Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 20 Mar 2021 22:18:48 -0700 Subject: [PATCH] improve s3 credential handling --- Cargo.lock | 171 ++++++++++++++++++++---------------------- columnq/Cargo.toml | 2 + columnq/src/io/s3.rs | 113 +++++++++++++++++----------- roapi-http/Cargo.toml | 2 +- 4 files changed, 157 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64911f5..2d4d70f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,7 +69,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_urlencoded", + "serde_urlencoded 0.7.0", "sha-1", "smallvec", "time 0.2.25", @@ -199,7 +199,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_urlencoded", + "serde_urlencoded 0.7.0", "smallvec", "socket2", "time 0.2.25", @@ -294,7 +294,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=f7cf15749f2df907994f48ef0bfbed3001bf145e#f7cf15749f2df907994f48ef0bfbed3001bf145e" +source = "git+https://github.com/houqp/arrow.git?rev=cfec7bc607b9b564eceb352e54b21fa005a3d606#cfec7bc607b9b564eceb352e54b21fa005a3d606" dependencies = [ "cfg_aliases", "chrono", @@ -371,7 +371,7 @@ dependencies = [ "rand 0.8.3", "serde", "serde_json", - "serde_urlencoded", + "serde_urlencoded 0.7.0", ] [[package]] @@ -604,7 +604,9 @@ dependencies = [ "regex", "reqwest", "rusoto_core", + "rusoto_credential", "rusoto_s3", + "rusoto_sts", "serde", "serde_derive", "serde_json", @@ -684,64 +686,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "crossbeam" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd01a6eb3daaafa260f6fc94c3a6c36390abc2080e38e3e34ced87393fb77d80" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-channel" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-deque" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-epoch", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-utils", - "lazy_static", - "memoffset", - "scopeguard", -] - -[[package]] -name = "crossbeam-queue" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.3" @@ -797,16 +741,16 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=f7cf15749f2df907994f48ef0bfbed3001bf145e#f7cf15749f2df907994f48ef0bfbed3001bf145e" +source = "git+https://github.com/houqp/arrow.git?rev=cfec7bc607b9b564eceb352e54b21fa005a3d606#cfec7bc607b9b564eceb352e54b21fa005a3d606" dependencies = [ "ahash 0.7.2", "arrow", "async-trait", "chrono", "clap 2.33.3", - "crossbeam", "futures", "hashbrown", + "lazy_static", "log", "md-5", "num_cpus", @@ -814,17 +758,19 @@ dependencies = [ "parquet", "paste", "pin-project-lite", + "regex", "rustyline", "sha2", "sqlparser 0.8.0", "tokio", + "tokio-stream", "unicode-segmentation", ] [[package]] name = "deltalake" version = "0.2.1" -source = "git+https://github.com/delta-io/delta-rs.git?rev=6c8dad8c8fcd5420710dea723541a6c69251be17#6c8dad8c8fcd5420710dea723541a6c69251be17" +source = "git+https://github.com/delta-io/delta-rs.git?rev=0d8b23fb3401171f33c2f299335418abff4df606#0d8b23fb3401171f33c2f299335418abff4df606" dependencies = [ "anyhow", "arrow", @@ -834,13 +780,17 @@ dependencies = [ "chrono", "clap 3.0.0-beta.2", "env_logger", + "errno", "futures", "lazy_static", + "libc", "log", "parquet", "regex", "rusoto_core", + "rusoto_credential", "rusoto_s3", + "rusoto_sts", "serde", "serde_json", "thiserror", @@ -947,6 +897,27 @@ dependencies = [ "termcolor", ] +[[package]] +name = "errno" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa68f2fb9cae9d37c9b2b3584aba698a2e97f72d7aef7b9f7aa71d8b54ce46fe" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14ca354e36190500e1e1fb267c647932382b54053c50b14970856c0b00a35067" +dependencies = [ + "gcc", + "libc", +] + [[package]] name = "flatbuffers" version = "0.8.3" @@ -1109,6 +1080,12 @@ dependencies = [ "slab", ] +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + [[package]] name = "generic-array" version = "0.14.4" @@ -1482,15 +1459,6 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" -[[package]] -name = "memoffset" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.16" @@ -1669,15 +1637,15 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.32" +version = "0.10.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70" +checksum = "a61075b62a23fef5a29815de7536d940aa35ce96d18ce0cc5076272db678a577" dependencies = [ "bitflags", "cfg-if 1.0.0", "foreign-types", - "lazy_static", "libc", + "once_cell", "openssl-sys", ] @@ -1689,9 +1657,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-sys" -version = "0.9.60" +version = "0.9.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6" +checksum = "313752393519e876837e09e1fa183ddef0be7735868dced3196f4472d536277f" dependencies = [ "autocfg", "cc", @@ -1762,7 +1730,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/houqp/arrow.git?rev=f7cf15749f2df907994f48ef0bfbed3001bf145e#f7cf15749f2df907994f48ef0bfbed3001bf145e" +source = "git+https://github.com/houqp/arrow.git?rev=cfec7bc607b9b564eceb352e54b21fa005a3d606#cfec7bc607b9b564eceb352e54b21fa005a3d606" dependencies = [ "arrow", "base64 0.12.3", @@ -1789,9 +1757,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5d65c4d95931acda4498f675e332fcbdc9a06705cd07086c510e9b6009cd1c1" +checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58" [[package]] name = "percent-encoding" @@ -2067,9 +2035,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.4.4" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54fd1046a3107eb58f42de31d656fee6853e5d276c455fd943742dce89fc3dd3" +checksum = "957056ecddbeba1b26965114e191d2e8589ce74db242b6ea25fc4062427a5c19" dependencies = [ "aho-corasick", "memchr", @@ -2125,7 +2093,7 @@ dependencies = [ "pin-project-lite", "serde", "serde_json", - "serde_urlencoded", + "serde_urlencoded 0.7.0", "tokio", "tokio-native-tls", "url", @@ -2152,7 +2120,7 @@ dependencies = [ [[package]] name = "roapi-http" -version = "0.1.1" +version = "0.1.2" dependencies = [ "actix-cors", "actix-http", @@ -2253,6 +2221,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "rusoto_sts" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f93005e0c3b9e40a424b50ca71886d2445cc19bb6cdac3ac84c2daff482eb59" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "rusoto_core", + "serde_urlencoded 0.6.1", + "xml-rs", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -2428,6 +2411,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +dependencies = [ + "dtoa", + "itoa", + "serde", + "url", +] + [[package]] name = "serde_urlencoded" version = "0.7.0" @@ -2636,9 +2631,9 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" [[package]] name = "syn" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fd9bc7ccc2688b3344c2f48b9b546648b25ce0b20fc717ee7fa7981a8ca9717" +checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f" dependencies = [ "proc-macro2", "quote", diff --git a/columnq/Cargo.toml b/columnq/Cargo.toml index 256b6d2..8caeeae 100644 --- a/columnq/Cargo.toml +++ b/columnq/Cargo.toml @@ -34,6 +34,8 @@ futures = "0.3" # S3 rusoto_core = { version = "0.46" } rusoto_s3 = { version = "0.46" } +rusoto_credential = { version = "0.46" } +rusoto_sts = { version = "0.46" } deltalake = { version = "0", features = ["s3"] } diff --git a/columnq/src/io/s3.rs b/columnq/src/io/s3.rs index 9485e7f..bab89d4 100644 --- a/columnq/src/io/s3.rs +++ b/columnq/src/io/s3.rs @@ -28,6 +28,33 @@ pub fn parse_uri<'a>(path: &'a str) -> Result<(&'a str, &'a str), ColumnQError> Ok((bucket, key)) } +fn new_s3_client() -> Result { + let region = rusoto_core::Region::default(); + let dispatcher = rusoto_core::HttpClient::new() + .map_err(|_| ColumnQError::S3Store("Failed to create request dispatcher".to_string()))?; + + let client = match std::env::var("AWS_WEB_IDENTITY_TOKEN_FILE") { + Ok(_) => { + let provider = rusoto_sts::WebIdentityProvider::from_k8s_env(); + let provider = + rusoto_credential::AutoRefreshingProvider::new(provider).map_err(|e| { + ColumnQError::S3Store(format!( + "Failed to retrieve S3 credentials with message: {}", + e.message + )) + })?; + rusoto_s3::S3Client::new_with(dispatcher, provider, region) + } + Err(_) => rusoto_s3::S3Client::new_with( + dispatcher, + rusoto_core::credential::ChainProvider::new(), + region, + ), + }; + + Ok(client) +} + enum ContinuationToken { Value(Option), End, @@ -36,7 +63,7 @@ enum ContinuationToken { fn list_objects<'a>( bucket: &'a str, key: &'a str, -) -> impl futures::Stream> + 'a { +) -> Result> + 'a, ColumnQError> { struct S3ListState<'a> { bucket: &'a str, key: String, @@ -58,51 +85,53 @@ fn list_objects<'a>( let init_state = S3ListState { bucket, key, - client: rusoto_s3::S3Client::new(rusoto_core::Region::default()), + client: new_s3_client()?, continuation_token: ContinuationToken::Value(None), obj_iter: Vec::new().into_iter(), }; - futures::stream::unfold(init_state, |mut state| async move { - match state.obj_iter.next() { - Some(obj) => Some((obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state)), - None => match &state.continuation_token { - ContinuationToken::End => None, // terminate stream - ContinuationToken::Value(v) => { - let list_req = rusoto_s3::ListObjectsV2Request { - bucket: state.bucket.to_string(), - prefix: Some(state.key.clone()), - start_after: Some(state.key.clone()), - continuation_token: v.clone(), - ..Default::default() - }; + Ok(futures::stream::unfold( + init_state, + |mut state| async move { + match state.obj_iter.next() { + Some(obj) => Some((obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state)), + None => match &state.continuation_token { + ContinuationToken::End => None, // terminate stream + ContinuationToken::Value(v) => { + let list_req = rusoto_s3::ListObjectsV2Request { + bucket: state.bucket.to_string(), + prefix: Some(state.key.clone()), + start_after: Some(state.key.clone()), + continuation_token: v.clone(), + ..Default::default() + }; - let list_resp = match state.client.list_objects_v2(list_req).await { - Ok(res) => res, - Err(e) => { - return Some(( - Err(ColumnQError::S3Store(format!( - "Failed to list bucket: {}", - e - ))), - state, - )); - } - }; - state.continuation_token = list_resp - .next_continuation_token - .map(|t| ContinuationToken::Value(Some(t))) - .unwrap_or(ContinuationToken::End); + let list_resp = match state.client.list_objects_v2(list_req).await { + Ok(res) => res, + Err(e) => { + return Some(( + Err(ColumnQError::S3Store(format!( + "Failed to list bucket: {}", + e + ))), + state, + )); + } + }; + state.continuation_token = list_resp + .next_continuation_token + .map(|t| ContinuationToken::Value(Some(t))) + .unwrap_or(ContinuationToken::End); - state.obj_iter = list_resp.contents.unwrap_or_else(Vec::new).into_iter(); - state - .obj_iter - .next() - .map(|obj| (obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state)) - } - }, - } - }) + state.obj_iter = list_resp.contents.unwrap_or_else(Vec::new).into_iter(); + state.obj_iter.next().map(|obj| { + (obj.key.ok_or_else(ColumnQError::s3_obj_missing_key), state) + }) + } + }, + } + }, + )) } pub async fn partition_key_to_reader( @@ -152,7 +181,7 @@ where I: Iterator, F: FnMut(std::io::Cursor>) -> Result, { - let client = rusoto_s3::S3Client::new(rusoto_core::Region::default()); + let client = new_s3_client()?; let mut partitions = vec![]; for s3_path in path_iter { @@ -184,7 +213,7 @@ where } Err(_) => { // fallback to directory listing - let mut key_stream = Box::pin(list_objects(bucket, key)); + let mut key_stream = Box::pin(list_objects(bucket, key)?); // TODO: fetch s3 objects concurrently while let Some(item) = key_stream.next().await { let partition_key = item?; diff --git a/roapi-http/Cargo.toml b/roapi-http/Cargo.toml index 8ae290f..99a585a 100644 --- a/roapi-http/Cargo.toml +++ b/roapi-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "roapi-http" -version = "0.1.1" +version = "0.1.2" authors = ["Qingping Hou "] edition = "2018"