[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: no longer load full table into ram in write by using concurrent write #2289

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
565f43d
close to compiling
aersam Mar 7, 2024
3a52bb7
still learning :)
aersam Mar 8, 2024
30a5463
some compile errors
aersam Mar 8, 2024
cde4207
another bug fix
aersam Mar 8, 2024
6743373
clippy feedback
aersam Mar 8, 2024
577442b
test compilation
aersam Mar 8, 2024
4b276a7
wip on tests
aersam Mar 8, 2024
9d022cb
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 8, 2024
d1352fa
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 11, 2024
d4d82ce
cleanup
aersam Mar 11, 2024
385c935
wip on fixes
aersam Mar 11, 2024
023df09
more fixes
aersam Mar 11, 2024
0397a0c
more fixes
aersam Mar 11, 2024
c83f947
fmt
aersam Mar 11, 2024
f131eb1
adjust test
aersam Mar 11, 2024
a3d5585
use into()
aersam Mar 12, 2024
965968c
we need GIL, no?
aersam Mar 13, 2024
83d398f
clippy, your so right
aersam Mar 13, 2024
98bf7ec
revert 965968cda0d3d6cace62905b5af5437960ea851a and 965968cda0d3d6cac…
aersam Mar 13, 2024
44cd5b9
Merge branch 'main' into write-iter2
aersam Mar 13, 2024
5ae3599
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 14, 2024
28eba65
fmt
aersam Mar 14, 2024
c66762a
Merge branch 'write-iter2' of https://github.com/aersam/delta-rs into…
aersam Mar 14, 2024
6e742a9
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 15, 2024
cf375b9
use tasks for writing
aersam Mar 15, 2024
551ab2a
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 20, 2024
2143d68
Feedback from Review
aersam Mar 20, 2024
ebb9420
should now work
aersam Mar 20, 2024
97ac37e
fix test
aersam Mar 20, 2024
380d4cd
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 21, 2024
a051bf7
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 25, 2024
4655742
test fixews
aersam Mar 25, 2024
704d86b
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Apr 15, 2024
2b135f9
fmt
aersam Apr 15, 2024
e5f12fb
use parallel as arg
aersam Apr 15, 2024
6242179
ruff
aersam Apr 15, 2024
68ea7ed
parallel
aersam Apr 15, 2024
0bbf3b5
remove fancy union syntax
aersam Apr 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) -
.await
.unwrap();

let tbl = table.collect().await.unwrap();
let _schema = tbl[0].schema().clone();

DeltaOps::try_from_uri(table_path)
.await
.unwrap()
.write(table.collect().await.unwrap())
.write(tbl.into())
.with_partition_columns(vec!["wr_returned_date_sk"])
.await
.unwrap();
Expand Down Expand Up @@ -552,7 +555,7 @@ async fn main() {
]));

let batch = RecordBatch::try_new(
schema,
schema.clone(),
vec![
Arc::new(StringArray::from(group_ids)),
Arc::new(StringArray::from(name)),
Expand All @@ -566,7 +569,7 @@ async fn main() {
DeltaOps::try_from_uri(output)
.await
.unwrap()
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ datafusion-common = { workspace = true, optional = true }
datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
async-channel = { version = "2.2", optional = true }
datafusion-functions = { workspace = true, optional = true }

# serde
Expand Down Expand Up @@ -67,7 +68,6 @@ tokio = { workspace = true, features = [
"fs",
"parking_lot",
] }

# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
dashmap = "5"
Expand Down Expand Up @@ -124,6 +124,7 @@ datafusion = [
"datafusion-sql",
"datafusion-functions",
"sqlparser",
"async-channel",
]
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,7 @@ impl From<Column> for DeltaColumn {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
Expand Down Expand Up @@ -1976,7 +1977,7 @@ mod tests {
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -2040,7 +2041,7 @@ mod tests {
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ mod tests {
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use datafusion_expr::{col, lit};

use crate::operations::write::WriteData;
use crate::writer::test_utils::{create_bare_table, get_arrow_schema, get_record_batch};
use crate::{DeltaOps, DeltaResult, DeltaTable};

Expand Down Expand Up @@ -244,7 +245,7 @@ mod tests {
async fn add_constraint_with_invalid_data() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -260,7 +261,7 @@ mod tests {
async fn add_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -285,7 +286,7 @@ mod tests {
// Add constraint by providing a datafusion expression.
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand Down Expand Up @@ -328,7 +329,7 @@ mod tests {
)
.unwrap();

let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap();
let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap();

let mut table = DeltaOps(table)
.add_constraint()
Expand All @@ -351,7 +352,7 @@ mod tests {
async fn add_conflicting_named_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -373,7 +374,7 @@ mod tests {
async fn write_data_that_violates_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;

let table = DeltaOps(write)
Expand All @@ -387,7 +388,7 @@ mod tests {
Arc::new(StringArray::from(vec!["2021-02-02"])),
];
let batch = RecordBatch::try_new(get_arrow_schema(&None), invalid_values)?;
let err = table.write(vec![batch]).await;
let err = table.write(batch.into()).await;
assert!(err.is_err());
Ok(())
}
Expand All @@ -396,11 +397,11 @@ mod tests {
async fn write_data_that_does_not_violate_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

let err = table.write(vec![batch]).await;
let err = table.write(batch.into()).await;

assert!(err.is_ok());
Ok(())
Expand Down
18 changes: 11 additions & 7 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl std::future::IntoFuture for DeleteBuilder {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
Expand Down Expand Up @@ -396,7 +397,7 @@ mod tests {
.unwrap();
// write some data
let table = DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -458,7 +459,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand All @@ -482,7 +483,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -549,7 +550,7 @@ mod tests {
)
.unwrap();

DeltaOps::new_in_memory().write(vec![batch]).await.unwrap()
DeltaOps::new_in_memory().write(batch.into()).await.unwrap()
}

// Validate behaviour of greater than
Expand Down Expand Up @@ -638,7 +639,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -696,7 +697,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -765,7 +766,10 @@ mod tests {
];
let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];

let table = DeltaOps::new_in_memory().write(batches).await.unwrap();
let table = DeltaOps::new_in_memory()
.write(batches.into())
.await
.unwrap();

let (table, _metrics) = DeltaOps(table)
.delete()
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl std::future::IntoFuture for DropConstraintBuilder {
#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::writer::test_utils::{create_bare_table, get_record_batch};
use crate::{DeltaOps, DeltaResult, DeltaTable};

Expand All @@ -121,7 +122,7 @@ mod tests {
async fn drop_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -145,7 +146,7 @@ mod tests {
async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;

let table = DeltaOps(write)
Expand All @@ -161,7 +162,7 @@ mod tests {
async fn drop_invalid_constraint_ignore() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;

let version = write.version();
Expand Down
9 changes: 7 additions & 2 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl std::future::IntoFuture for LoadBuilder {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::operations::{collect_sendable_stream, DeltaOps};
use crate::writer::test_utils::{get_record_batch, TestResult};
use crate::DeltaTableBuilder;
Expand Down Expand Up @@ -115,7 +116,9 @@ mod tests {
#[tokio::test]
async fn test_write_load() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?;
let table = DeltaOps::new_in_memory()
.write(batch.clone().into())
.await?;

let (_table, stream) = DeltaOps(table).load().await?;
let data = collect_sendable_stream(stream).await?;
Expand Down Expand Up @@ -146,7 +149,9 @@ mod tests {
#[tokio::test]
async fn test_load_with_columns() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?;
let table = DeltaOps::new_in_memory()
.write(batch.clone().into())
.await?;

let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?;
let data = collect_sendable_stream(stream).await?;
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,7 @@ mod tests {
use crate::kernel::StructField;
use crate::operations::merge::generalize_filter;
use crate::operations::merge::try_construct_early_filter;
use crate::operations::write::WriteData;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
Expand Down Expand Up @@ -1635,7 +1636,7 @@ mod tests {
.unwrap();
// write some data
DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(SaveMode::Append)
.await
.unwrap()
Expand Down Expand Up @@ -2924,7 +2925,7 @@ mod tests {
.unwrap();

let table = DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -3038,7 +3039,7 @@ mod tests {
.unwrap();

let table = DeltaOps(table)
.write(vec![batch1, batch2])
.write((vec![batch1, batch2]).into())
.with_write_batch_size(2)
.with_save_mode(SaveMode::Append)
.await
Expand Down Expand Up @@ -3149,7 +3150,7 @@ mod tests {
.unwrap();

let table = DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use self::{
};
#[cfg(feature = "datafusion")]
pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;

use optimize::OptimizeBuilder;
use restore::RestoreBuilder;

Expand Down Expand Up @@ -137,8 +136,8 @@ impl DeltaOps {
/// Write data to Delta table
#[cfg(feature = "datafusion")]
#[must_use]
pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
WriteBuilder::new(self.0.log_store, self.0.state).with_input_batches(batches)
pub fn write(self, data: write::WriteData) -> WriteBuilder {
WriteBuilder::new(self.0.log_store, self.0.state).with_data(data)
}

/// Vacuum stale files from delta table
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,8 @@ pub(super) mod zorder {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;

use super::*;
use ::datafusion::assert_batches_eq;
use arrow_array::{Int32Array, StringArray};
Expand Down Expand Up @@ -1393,7 +1395,7 @@ pub(super) mod zorder {
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down
Loading
Loading