update datafusion to 31 (#297)

This commit is contained in:
QP Hou 2023-09-30 19:24:47 -07:00 committed by GitHub
parent 32d85f4b0c
commit 2baf669841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 868 additions and 1147 deletions

View File

@ -8,7 +8,7 @@ on:
env:
# NOTE: the version is also defined in roapi_http_release.yml, columnq_cli_release.yml and Dockerfile
RUST_TC_NIGHTLY_VER: "2022-09-24"
RUST_TC_NIGHTLY_VER: "2023-09-15"
jobs:
build:

View File

@ -12,7 +12,7 @@ on:
env:
# NOTE: the version is also defined in build.yml and Dockerfile
RUST_TC_NIGHTLY_VER: "2022-09-24"
RUST_TC_NIGHTLY_VER: "2023-09-15"
jobs:
# skip tag version validation on non-release branch run

View File

@ -12,7 +12,7 @@ on:
env:
# NOTE: the version is also defined in build.yml and Dockerfile
RUST_TC_NIGHTLY_VER: "2022-09-24"
RUST_TC_NIGHTLY_VER: "2023-09-15"
jobs:
validate-release-tag:

1763
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,11 +1,11 @@
FROM instrumentisto/rust:nightly-bullseye-2022-09-24 AS builder
FROM instrumentisto/rust:nightly-bullseye-2023-09-15 AS builder
WORKDIR /roapi_src
COPY ./ /roapi_src
RUN apt-get update \
&& apt-get install --no-install-recommends -y cmake
RUN RUSTFLAGS='-C target-cpu=skylake' \
cargo +nightly install --locked --git https://github.com/roapi/roapi --branch main --bins roapi --features "simd database"
cargo +nightly install --bins roapi --features "simd database" --path roapi
FROM debian:bullseye-slim
LABEL org.opencontainers.image.source https://github.com/roapi/roapi

View File

@ -2,5 +2,10 @@
set -eux
docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
# Custom image - see fsouza/fake-gcs-server#1164
docker run -d -p 4443:4443 \
tustvold/fake-gcs-server \
-scheme http \
-public-host localhost:4443
echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "/tmp/gcs.json"

View File

@ -13,10 +13,10 @@ path = "src/lib.rs"
[dependencies]
# pulling arrow-schema manually to enable the serde feature.
# TODO: add serde feature in datafusion to avoid this workaround
arrow-schema = { version = "37.0.0", features = ["serde"] }
arrow-schema = { version = "46.0.0", features = ["serde"] }
datafusion = "23"
object_store = { version = "0.5.6", features = ["aws_profile", "gcp", "azure"] }
datafusion = "31"
object_store = { version = "0.7", features = ["gcp", "azure"] }
percent-encoding = "2.2.0"
url = "2.2"
@ -24,12 +24,12 @@ log = "0"
regex = "1"
lazy_static = "1"
graphql-parser = "0"
sqlparser = "0.33" # version need to be in sync with convergence
sqlparser = "0.37" # version need to be in sync with convergence and datafusion
yup-oauth2 = { version = "6.2", default-features = false, features = [
"service_account",
] }
thiserror = "1"
serde_json = { version = "1", features = ["arbitrary_precision"] }
serde_json = { version = "1" }
serde_derive = "1"
serde = "1"
uriparse = "0"
@ -47,14 +47,16 @@ hyper-rustls = { version = "0.23.2", default-features = false, optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
[dependencies.deltalake]
version = "0.15"
git = "https://github.com/delta-io/delta-rs.git"
rev = "63c14b3716428ff65e01404c6f7e62f341c98f05"
features = ["datafusion"]
default-features = false
version = "0.10.0"
features = ["datafusion-ext"]
[dependencies.connectorx]
git = "https://github.com/roapi/connector-x.git"
rev = "dd58b6a90d28b1ee7e62da859a5ba1d2d6c0b179"
version = "0.3.2-alpha.5"
rev = "1e642af502d3076476ab4bc8c7e40807059a42fe"
version = "0.3.3-alpha.1"
features = ["default", "dst_arrow"]
optional = true

View File

@ -117,14 +117,14 @@ impl ColumnQ {
}
}
};
return match object_store {
match object_store {
Ok(store) => {
let runtime_env = self.dfctx.runtime_env();
let result_store = runtime_env.register_object_store(url, store);
Ok(result_store)
}
Err(e) => Err(ColumnQError::InvalidUri(e.to_string())),
};
}
}
pub async fn load_kv(&mut self, kv: KeyValueSource) -> Result<(), ColumnQError> {
use datafusion::arrow::datatypes::DataType;

View File

@ -5,8 +5,17 @@ use crate::error::ColumnQError;
pub fn record_batches_to_bytes(
batches: &[arrow::record_batch::RecordBatch],
) -> Result<Vec<u8>, ColumnQError> {
let json_rows = arrow::json::writer::record_batches_to_json_rows(batches)?;
serde_json::to_vec(&json_rows).map_err(ColumnQError::json_parse)
let buf = Vec::new();
let mut writer = arrow::json::writer::ArrayWriter::new(buf);
// TODO: update upstream writer to take an interator so we don't need to collect a new array
// here
writer.write_batches(
&batches
.iter()
.collect::<Vec<&arrow::record_batch::RecordBatch>>(),
)?;
writer.finish()?;
Ok(writer.into_inner())
}
#[cfg(test)]

View File

@ -1,6 +1,7 @@
use crate::error::ColumnQError;
use crate::table::TableSource;
use futures::TryStreamExt;
use log::debug;
use object_store::ObjectStore;
use percent_encoding;
use std::str::FromStr;
@ -64,7 +65,8 @@ where
Ok(reader) => {
partitions.push(partition_reader(reader)?);
}
Err(_) => {
Err(e) => {
debug!("`{path}` is not an object, try to list as a directory: {e}");
// fallback to directory listing
let paths = client
.clone()

View File

@ -58,11 +58,11 @@ pub async fn to_mem_table(
None => {
let schemas = partitions_from_table_source!(
t,
|mut r| {
let (schema, record_count) = arrow::csv::reader::infer_reader_schema(
&mut r, delimiter, None, has_header,
)?;
|r| {
let fmt = arrow::csv::reader::Format::default()
.with_delimiter(delimiter)
.with_header(has_header);
let (schema, record_count) = fmt.infer_schema(r, None)?;
if record_count > 0 {
Ok(Some(schema))
} else {
@ -83,19 +83,15 @@ pub async fn to_mem_table(
let partitions: Vec<Vec<RecordBatch>> = partitions_from_table_source!(
t,
|r| -> Result<Vec<RecordBatch>, ColumnQError> {
let csv_reader = arrow::csv::Reader::new(
r,
schema_ref.clone(),
has_header,
Some(delimiter),
batch_size,
None,
projection.cloned(),
None,
);
let mut builder = arrow::csv::reader::ReaderBuilder::new(schema_ref.clone())
.has_header(has_header)
.with_delimiter(delimiter)
.with_batch_size(batch_size);
if let Some(p) = projection {
builder = builder.with_projection(p.clone());
}
let csv_reader = builder.build(r)?;
csv_reader
.into_iter()
.map(|batch| Ok(batch?))
.collect::<Result<Vec<RecordBatch>, ColumnQError>>()
},

View File

@ -121,7 +121,7 @@ fn infer_schema(rows: &[Vec<String>]) -> Schema {
row.iter().enumerate().for_each(|(i, col_val)| {
let col_name = &col_names[i];
let col_type = infer_value_type(col_val);
let entry = col_types.entry(col_name).or_insert_with(HashSet::new);
let entry = col_types.entry(col_name).or_default();
entry.insert(col_type);
});
});

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
#[allow(deprecated)]
use datafusion::arrow::json::reader::{Decoder, DecoderOptions};
use datafusion::arrow::json::reader::ReaderBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use serde_json::value::Value;
@ -55,49 +55,47 @@ fn json_vec_to_partition(
})?,
};
// decode to arrow record batch
#[allow(deprecated)]
// TODO: switch to RawDecoder
let decoder = Decoder::new(
Arc::new(schema.clone()),
DecoderOptions::new().with_batch_size(batch_size),
);
let mut batches = vec![];
{
// enclose values_iter in its own scope so it won't borrow schema_ref til end of this
// function
let mut values_iter: Box<dyn Iterator<Item = arrow::error::Result<Value>>> =
if array_encoded {
// convert row array to object based on schema
// TODO: support array_encoded read in upstream arrow json reader instead
Box::new(json_rows.into_iter().map(|json_row| {
let mut m = serde_json::map::Map::new();
schema.fields().iter().enumerate().try_for_each(|(i, f)| {
match json_row.get(i) {
Some(x) => {
m.insert(f.name().to_string(), x.clone());
Ok(())
}
None => Err(arrow::error::ArrowError::JsonError(format!(
"arry encoded JSON row missing column {i:?} : {json_row:?}"
))),
// TODO: batch_size setting here doesn't work because we are invoking serialize directly. might
// be better to break up the batch ourselives.
let mut decoder = ReaderBuilder::new(Arc::new(schema.clone()))
.with_batch_size(batch_size)
.build_decoder()?;
if array_encoded {
// convert row array to object based on schema
// TODO: support array_encoded read in upstream arrow json reader instead
let objects = json_rows
.into_iter()
.map(|json_row| {
let mut m = serde_json::map::Map::new();
schema.fields().iter().enumerate().try_for_each(|(i, f)| {
match json_row.get(i) {
Some(x) => {
m.insert(f.name().to_string(), x.clone());
Ok(())
}
})?;
Ok(Value::Object(m))
}))
} else {
// no need to convert row since each row is already an object
Box::new(json_rows.into_iter().map(Ok))
};
None => Err(arrow::error::ArrowError::JsonError(format!(
"arry encoded JSON row missing column {i:?} : {json_row:?}"
))),
}
})?;
Ok(Value::Object(m))
})
.collect::<Result<Vec<Value>, ColumnQError>>()?;
while let Some(batch) = decoder.next_batch(&mut values_iter).map_err(|e| {
ColumnQError::LoadJson(format!("Failed decode JSON into Arrow record batch: {e}"))
})? {
batches.push(batch);
}
}
// TODO: avoid unnecessary collection here, update upstream json reader to take an iterator
// instead of slice
decoder.serialize(&objects)?;
} else {
// Note: serialize ignores any batch size setting, and always decodes all rows
decoder.serialize(&json_rows)?;
};
Ok((schema, batches))
let batch = decoder
.flush()?
.ok_or_else(|| ColumnQError::LoadJson("No item found".to_string()))?;
Ok((schema, vec![batch]))
}
async fn to_partitions(
@ -176,6 +174,7 @@ pub async fn to_mem_table(
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use datafusion::{datasource::TableProvider, prelude::SessionContext};
@ -183,6 +182,48 @@ mod tests {
#[tokio::test]
async fn nested_struct_and_lists() -> Result<(), ColumnQError> {
let json_content = r#"[
{
"foo": [
{
"bar": "1234",
"baz": 1
}
]
}
]"#;
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_file_path = tmp_dir.path().join("nested.json");
let mut f = std::fs::File::create(tmp_file_path.clone()).unwrap();
writeln!(f, "{}", json_content).unwrap();
let ctx = SessionContext::new();
let t = to_mem_table(
&TableSource::new(
"nested_json".to_string(),
format!("{}", tmp_file_path.to_string_lossy()),
),
&ctx,
)
.await
.unwrap();
let schema = t.schema();
let fields = schema.fields();
let mut obj_keys = fields.iter().map(|f| f.name()).collect::<Vec<_>>();
obj_keys.sort();
let mut expected_obj_keys = vec!["foo"];
expected_obj_keys.sort();
assert_eq!(obj_keys, expected_obj_keys);
Ok(())
}
#[tokio::test]
async fn spacex_launches() -> Result<(), ColumnQError> {
let ctx = SessionContext::new();
let t = to_mem_table(
&TableSource::new(
@ -191,7 +232,8 @@ mod tests {
),
&ctx,
)
.await?;
.await
.unwrap();
let schema = t.schema();
let fields = schema.fields();
@ -244,8 +286,9 @@ mod tests {
source.batch_size = 1;
let (_, p) = to_partitions(&source, &ctx).await?;
assert_eq!(p.len(), 1);
assert_eq!(p[0][0].num_rows(), source.batch_size);
assert_eq!(p[0].len(), 132);
let batch = &p[0];
assert_eq!(batch[0].num_rows(), 132);
assert_eq!(batch.len(), source.batch_size);
Ok(())
}
}

View File

@ -439,7 +439,7 @@ impl TableSource {
"sqlite" | "sqlite3" | "db" => "sqlite",
_ => {
return Err(ColumnQError::InvalidUri(format!(
"unsupported extension in uri: {uri}"
"unsupported extension {ext} in uri: {uri}"
)));
}
},

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
#[allow(deprecated)]
use datafusion::arrow::json::reader::{infer_json_schema, Decoder, DecoderOptions, ValueIter};
use datafusion::arrow::json::reader::{infer_json_schema, ReaderBuilder};
use datafusion::arrow::record_batch::RecordBatch;
use crate::error::ColumnQError;
@ -19,18 +19,12 @@ fn decode_json_from_reader<R: Read>(
schema_ref: SchemaRef,
batch_size: usize,
) -> Result<Vec<RecordBatch>, ColumnQError> {
#[allow(deprecated)]
// TODO: switch to RawDecoder
let decoder = Decoder::new(
schema_ref,
DecoderOptions::new().with_batch_size(batch_size),
);
let mut reader = BufReader::new(r);
let mut value_reader = ValueIter::new(&mut reader, None);
let mut batches = vec![];
while let Some(batch) = decoder.next_batch(&mut value_reader)? {
batches.push(batch);
}
let batch_reader = ReaderBuilder::new(schema_ref)
.with_batch_size(batch_size)
.build(BufReader::new(r))?;
let batches = batch_reader.collect::<Result<Vec<RecordBatch>, _>>()?;
Ok(batches)
}

View File

@ -48,7 +48,7 @@ fn infer_schema(r: &Range<calamine::DataType>) -> Result<Schema, ColumnQError> {
for (i, col_val) in row.iter().enumerate() {
let col_name = col_names.get(i).unwrap();
let col_type = infer_value_type(col_val).unwrap();
let entry = col_types.entry(col_name).or_insert_with(HashSet::new);
let entry = col_types.entry(col_name).or_default();
entry.insert(col_type);
}
}

View File

@ -40,8 +40,11 @@ clap = { version = "3", features = ["color"] }
thiserror = "1"
anyhow = "1"
convergence = "0.11.0"
convergence-arrow = "0.11.0"
# convergence = "0.13"
# convergence-arrow = "0.13"
# patch for datafusion 31
convergence = { git = "https://github.com/roapi/convergence", rev = "95fe472e429d02f18016a0d03c884a78fa62861e" }
convergence-arrow = { git = "https://github.com/roapi/convergence", rev = "95fe472e429d02f18016a0d03c884a78fa62861e" }
[features]
default = ["rustls", "snmalloc"]