[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: no longer load full table into ram in write by using concurrent write #2289

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
565f43d
close to compiling
aersam Mar 7, 2024
3a52bb7
still learning :)
aersam Mar 8, 2024
30a5463
some compile errors
aersam Mar 8, 2024
cde4207
another bug fix
aersam Mar 8, 2024
6743373
clippy feedback
aersam Mar 8, 2024
577442b
test compilation
aersam Mar 8, 2024
4b276a7
wip on tests
aersam Mar 8, 2024
9d022cb
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 8, 2024
d1352fa
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 11, 2024
d4d82ce
cleanup
aersam Mar 11, 2024
385c935
wip on fixes
aersam Mar 11, 2024
023df09
more fixes
aersam Mar 11, 2024
0397a0c
more fixes
aersam Mar 11, 2024
c83f947
fmt
aersam Mar 11, 2024
f131eb1
adjust test
aersam Mar 11, 2024
a3d5585
use into()
aersam Mar 12, 2024
965968c
we need GIL, no?
aersam Mar 13, 2024
83d398f
clippy, your so right
aersam Mar 13, 2024
98bf7ec
revert 965968cda0d3d6cace62905b5af5437960ea851a and 965968cda0d3d6cac…
aersam Mar 13, 2024
44cd5b9
Merge branch 'main' into write-iter2
aersam Mar 13, 2024
5ae3599
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 14, 2024
28eba65
fmt
aersam Mar 14, 2024
c66762a
Merge branch 'write-iter2' of https://github.com/aersam/delta-rs into…
aersam Mar 14, 2024
6e742a9
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 15, 2024
cf375b9
use tasks for writing
aersam Mar 15, 2024
551ab2a
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 20, 2024
2143d68
Feedback from Review
aersam Mar 20, 2024
ebb9420
should now work
aersam Mar 20, 2024
97ac37e
fix test
aersam Mar 20, 2024
380d4cd
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 21, 2024
a051bf7
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 25, 2024
4655742
test fixews
aersam Mar 25, 2024
704d86b
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Apr 15, 2024
2b135f9
fmt
aersam Apr 15, 2024
e5f12fb
use parallel as arg
aersam Apr 15, 2024
6242179
ruff
aersam Apr 15, 2024
68ea7ed
parallel
aersam Apr 15, 2024
0bbf3b5
remove fancy union syntax
aersam Apr 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Feedback from Review
  • Loading branch information
aersam committed Mar 20, 2024
commit 2143d68e5766faa61df3d56b8550c0696c868e67
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,4 @@ uuid = { version = "1" }
async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
async-channel = { version = "2.2" }
num_cpus = { version = "1" }
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ datafusion-common = { workspace = true, optional = true }
datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }

async-channel = { version = "2.2", optional = true }
# serde
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand All @@ -66,7 +66,6 @@ tokio = { workspace = true, features = [
"fs",
"parking_lot",
] }
async-channel = { workspace = true }
# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
dashmap = "5"
Expand Down Expand Up @@ -122,6 +121,7 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"sqlparser",
"async-channel",
]
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct WriteBuilderConfig {
/// Column names for table partitioning
partition_columns: Option<Vec<String>>,
/// Number of streams to run concurrently on reading
nr_concurrent_streams: u32,
concurrent_streams: u32,
/// When using `Overwrite` mode, replace data that matches a predicate
predicate: Option<Expression>,
/// Size above which we will write a buffered parquet file to disk.
Expand Down Expand Up @@ -208,7 +208,7 @@ impl WriteBuilder {
safe_cast: false,
schema_mode: None,
writer_properties: None,
nr_concurrent_streams: 1,
concurrent_streams: 1,
commit_properties: CommitProperties::default(),
name: None,
description: None,
Expand Down Expand Up @@ -310,8 +310,8 @@ impl WriteBuilder {
}

/// Set the number of concurrent streams to use when writing data from a RecordBatch
pub fn with_nr_concurrent_streams(mut self, nr_concurrent_streams: u32) -> Self {
self.config.nr_concurrent_streams = nr_concurrent_streams;
pub fn with_concurrent_streams(mut self, concurrent_streams: u32) -> Self {
self.config.concurrent_streams = concurrent_streams;
self
}

Expand Down Expand Up @@ -730,15 +730,15 @@ impl std::future::IntoFuture for WriteBuilder {
}
Some(WriteData::RecordBatches((input_batches, _))) => {
let (sender, receiver) =
async_channel::bounded(this.nr_concurrent_streams as usize);
async_channel::bounded(this.concurrent_streams as usize);
producer_task = Some(tokio::task::spawn(async move {
for batch in input_batches {
sender.send(batch).await?;
}
sender.close();
Ok(())
}));
(0..this.nr_concurrent_streams)
(0..this.concurrent_streams)
.map(|_| {
let stream = receiver.clone().map(Ok);
Box::pin(RecordBatchStreamAdapter::new(input_schema.clone(), stream))
Expand Down
3 changes: 1 addition & 2 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,14 @@ def write_new_deltalake(
def write_to_deltalake(
table_uri: str,
data: pyarrow.RecordBatchReader,
data_schema: pyarrow.Schema,
partition_by: Optional[List[str]],
mode: str,
max_rows_per_group: int,
schema_mode: Optional[str],
predicate: Optional[str],
name: Optional[str],
description: Optional[str],
nr_concurrent_streams: Optional[int],
concurrent_streams: Optional[int],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
writer_properties: Optional[Dict[str, Optional[str]]],
Expand Down
11 changes: 5 additions & 6 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def write_deltalake(
mode: Literal["error", "append", "ignore"] = ...,
name: Optional[str] = ...,
description: Optional[str] = ...,
nr_concurrent_streams: Optional[int] = ...,
concurrent_streams: Optional[int] = ...,
configuration: Optional[Mapping[str, Optional[str]]] = ...,
overwrite_schema: bool = ...,
schema_mode: Optional[Literal["merge", "overwrite"]] = ...,
Expand Down Expand Up @@ -158,7 +158,7 @@ def write_deltalake(
mode: Literal["overwrite"],
name: Optional[str] = ...,
description: Optional[str] = ...,
nr_concurrent_streams: Optional[int] = ...,
concurrent_streams: Optional[int] = ...,
configuration: Optional[Mapping[str, Optional[str]]] = ...,
overwrite_schema: bool = ...,
schema_mode: Optional[Literal["merge", "overwrite"]] = ...,
Expand Down Expand Up @@ -193,7 +193,7 @@ def write_deltalake(
max_rows_per_group: int = 128 * 1024,
name: Optional[str] = None,
description: Optional[str] = None,
nr_concurrent_streams: Optional[int] = None,
concurrent_streams: Optional[int] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
overwrite_schema: bool = False,
schema_mode: Optional[Literal["merge", "overwrite"]] = None,
Expand Down Expand Up @@ -249,7 +249,7 @@ def write_deltalake(
If this value is set, then min_rows_per_group should also be set.
name: User-provided identifier for this table.
description: User-provided description for this table.
nr_concurrent_streams: Number of concurrent streams to use when writing. At least nr_concurrent_streams files will be written.
concurrent_streams: Number of concurrent streams to use when writing. At least concurrent_streams files will be written.
configuration: A map containing configuration options for the metadata action.
overwrite_schema: Deprecated, use schema_mode instead.
schema_mode: If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema.
Expand Down Expand Up @@ -320,15 +320,14 @@ def write_deltalake(
write_deltalake_rust(
table_uri=table_uri,
data=data,
data_schema=schema,
partition_by=partition_by,
mode=mode,
max_rows_per_group=max_rows_per_group,
schema_mode=schema_mode,
predicate=predicate,
name=name,
description=description,
nr_concurrent_streams=nr_concurrent_streams,
concurrent_streams=concurrent_streams,
configuration=configuration,
storage_options=storage_options,
writer_properties=(
Expand Down
12 changes: 4 additions & 8 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1408,15 +1408,14 @@ fn write_to_deltalake(
py: Python,
table_uri: String,
data: PyArrowType<ArrowArrayStreamReader>,
data_schema: PyArrowType<ArrowSchema>,
mode: String,
max_rows_per_group: i64,
schema_mode: Option<String>,
partition_by: Option<Vec<String>>,
predicate: Option<String>,
name: Option<String>,
description: Option<String>,
nr_concurrent_streams: Option<u32>,
concurrent_streams: Option<u32>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
writer_properties: Option<HashMap<String, Option<String>>>,
Expand All @@ -1434,10 +1433,7 @@ fn write_to_deltalake(
.map_err(PythonError::from)?;

let mut builder = table
.write(WriteData::RecordBatches((
Box::new(batches),
Arc::new(data_schema.0),
)))
.write(WriteData::RecordBatches((Box::new(batches), data.1)))
.with_save_mode(save_mode)
.with_write_batch_size(max_rows_per_group as usize);
if let Some(schema_mode) = schema_mode {
Expand All @@ -1446,8 +1442,8 @@ fn write_to_deltalake(
if let Some(partition_columns) = partition_by {
builder = builder.with_partition_columns(partition_columns);
}
if let Some(nr_concurrent_streams) = nr_concurrent_streams {
builder = builder.with_nr_concurrent_streams(nr_concurrent_streams);
if let Some(concurrent_streams) = concurrent_streams {
builder = builder.with_concurrent_streams(concurrent_streams);
}
if let Some(writer_props) = writer_properties {
builder = builder.with_writer_properties(
Expand Down
Loading