[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: no longer load full table into ram in write by using concurrent write #2289

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
565f43d
close to compiling
aersam Mar 7, 2024
3a52bb7
still learning :)
aersam Mar 8, 2024
30a5463
some compile errors
aersam Mar 8, 2024
cde4207
another bug fix
aersam Mar 8, 2024
6743373
clippy feedback
aersam Mar 8, 2024
577442b
test compilation
aersam Mar 8, 2024
4b276a7
wip on tests
aersam Mar 8, 2024
9d022cb
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 8, 2024
d1352fa
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 11, 2024
d4d82ce
cleanup
aersam Mar 11, 2024
385c935
wip on fixes
aersam Mar 11, 2024
023df09
more fixes
aersam Mar 11, 2024
0397a0c
more fixes
aersam Mar 11, 2024
c83f947
fmt
aersam Mar 11, 2024
f131eb1
adjust test
aersam Mar 11, 2024
a3d5585
use into()
aersam Mar 12, 2024
965968c
we need GIL, no?
aersam Mar 13, 2024
83d398f
clippy, your so right
aersam Mar 13, 2024
98bf7ec
revert 965968cda0d3d6cace62905b5af5437960ea851a and 965968cda0d3d6cac…
aersam Mar 13, 2024
44cd5b9
Merge branch 'main' into write-iter2
aersam Mar 13, 2024
5ae3599
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 14, 2024
28eba65
fmt
aersam Mar 14, 2024
c66762a
Merge branch 'write-iter2' of https://github.com/aersam/delta-rs into…
aersam Mar 14, 2024
6e742a9
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 15, 2024
cf375b9
use tasks for writing
aersam Mar 15, 2024
551ab2a
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 20, 2024
2143d68
Feedback from Review
aersam Mar 20, 2024
ebb9420
should now work
aersam Mar 20, 2024
97ac37e
fix test
aersam Mar 20, 2024
380d4cd
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 21, 2024
a051bf7
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Mar 25, 2024
4655742
test fixews
aersam Mar 25, 2024
704d86b
Merge branch 'main' of https://github.com/aersam/delta-rs into write-…
aersam Apr 15, 2024
2b135f9
fmt
aersam Apr 15, 2024
e5f12fb
use parallel as arg
aersam Apr 15, 2024
6242179
ruff
aersam Apr 15, 2024
68ea7ed
parallel
aersam Apr 15, 2024
0bbf3b5
remove fancy union syntax
aersam Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cleanup
  • Loading branch information
aersam committed Mar 11, 2024
commit d4d82ce9f74335d1789099edd197ddedb6182c4d
171 changes: 65 additions & 106 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ use arrow_array::RecordBatch;
use arrow_cast::can_cast_types;
use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DFSchema;
use datafusion_expr::Expr;
use futures::future::BoxFuture;
use futures::StreamExt;
use parquet::file::properties::WriterProperties;

Expand Down Expand Up @@ -337,28 +338,51 @@ async fn check_preconditions(
}
}

fn plan_to_streams(
state: SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<SendableRecordBatchStream>, DeltaTableError> {
let task_ctx = Arc::new(TaskContext::from(&state));
let mut result = Vec::new();
for i in 0..plan.output_partitioning().partition_count() {
let stream = plan.execute(i, task_ctx.clone())?;
result.push(stream);
}
return Ok(result);
}

#[allow(clippy::too_many_arguments)]
async fn write_execution_plan_with_predicate(
predicate: Option<Expr>,
snapshot: Option<&DeltaTableState>,
state: SessionState,
plan: Arc<dyn ExecutionPlan>,
write_data: Vec<SendableRecordBatchStream>,
partition_columns: Vec<String>,
object_store: ObjectStoreRef,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
safe_cast: bool,
schema_mode: Option<SchemaMode>,
target_schema: Option<ArrowSchemaRef>,
) -> DeltaResult<Vec<Action>> {
let schema: ArrowSchemaRef = if schema_mode.is_some() {
plan.schema()
} else {
snapshot
.and_then(|s| s.input_schema().ok())
.unwrap_or(plan.schema())
let target_schema = match target_schema {
Some(schema) => schema,
None => {
if write_data.is_empty() {
return Err(DeltaTableError::Generic(
"No schema provided and no data to infer schema from".to_string(),
));
}
let write_schema = write_data[0].schema().clone();
if schema_mode.is_some() {
write_schema
} else {
snapshot
.and_then(|s| s.input_schema().ok())
.unwrap_or(write_schema)
}
}
};

let checker = if let Some(snapshot) = snapshot {
DeltaDataChecker::new(snapshot)
} else {
Expand All @@ -375,20 +399,18 @@ async fn write_execution_plan_with_predicate(

// Write data to disk
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_schema = schema.clone();
let task_ctx = Arc::new(TaskContext::from(&state));
for mut stream in write_data {
let inner_schema = target_schema.clone();

let config = WriterConfig::new(
inner_schema.clone(),
target_schema.clone(),
partition_columns.clone(),
writer_properties.clone(),
target_file_size,
write_batch_size,
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
let checker_stream = checker.clone();
let mut stream = inner_plan.execute(i, task_ctx)?;
let checker_stream: DeltaDataChecker = checker.clone();
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Action>>> =
tokio::task::spawn(async move {
while let Some(maybe_batch) = stream.next().await {
Expand Down Expand Up @@ -441,15 +463,15 @@ pub(crate) async fn write_execution_plan(
write_execution_plan_with_predicate(
None,
snapshot,
state,
plan,
plan_to_streams(state, plan)?,
partition_columns,
object_store,
target_file_size,
write_batch_size,
writer_properties,
safe_cast,
schema_mode,
None,
)
.await
}
Expand Down Expand Up @@ -654,95 +676,32 @@ impl std::future::IntoFuture for WriteBuilder {
_ => (None, None),
};

match input_data {
let write_data = match input_data {
None => return Err(WriteError::MissingData.into()),
Some(WriteData::DataFusionPlan(plan)) => {
let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
this.snapshot.as_ref(),
state.clone(),
plan,
partition_columns.clone(),
this.log_store.object_store().clone(),
this.target_file_size,
this.write_batch_size,
this.writer_properties.clone(),
this.safe_cast,
this.schema_mode,
)
.await?;
actions.extend(add_actions);
}
Some(WriteData::DataFusionPlan(plan)) => plan_to_streams(state.clone(), plan)?,
Some(WriteData::RecordBatches((input_batches, _))) => {
for batches in ChunksIterator::new(input_batches, 10) {
let data = if !partition_columns.is_empty() {
// TODO partitioning should probably happen in its own plan ...
let mut partitions: HashMap<String, Vec<RecordBatch>> = HashMap::new();
for batch in batches {
let real_batch = match new_schema.clone() {
Some(new_schema) => {
cast_record_batch(&batch, new_schema, false, true)?
}
None => batch,
};

let divided = divide_by_partition_values(
target_schema.clone(),
partition_columns.clone(),
&real_batch,
)?;
for part in divided {
let key = part.partition_values.hive_partition_path();
match partitions.get_mut(&key) {
Some(part_batches) => {
part_batches.push(part.record_batch);
}
None => {
partitions.insert(key, vec![part.record_batch]);
}
}
}
}
partitions.into_values().collect::<Vec<_>>()
} else {
match new_schema {
Some(ref new_schema) => {
let mut new_batches = vec![];
for batch in batches {
new_batches.push(cast_record_batch(
&batch,
new_schema.clone(),
false,
true,
)?);
}
vec![new_batches]
}
None => vec![batches],
}
};

let plan =
Arc::new(MemoryExec::try_new(&data, target_schema.clone(), None)?);

let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
this.snapshot.as_ref(),
state.clone(),
plan,
partition_columns.clone(),
this.log_store.object_store().clone(),
this.target_file_size,
this.write_batch_size,
this.writer_properties.clone(),
this.safe_cast,
this.schema_mode,
)
.await?;
actions.extend(add_actions);
}
let stream = futures::stream::iter(input_batches.map(Ok));
let rec_batch_stream =
Box::pin(RecordBatchStreamAdapter::new(input_schema.clone(), stream))
as SendableRecordBatchStream;
vec![rec_batch_stream]
}
}
};
let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
this.snapshot.as_ref(),
write_data,
partition_columns.clone(),
this.log_store.object_store().clone(),
this.target_file_size,
this.write_batch_size,
this.writer_properties.clone(),
this.safe_cast,
this.schema_mode,
None,
)
.await?;
actions.extend(add_actions);
// Here we need to validate if the new data conforms to a predicate if one is provided

if this.schema_mode == Some(SchemaMode::Merge) && schema_drift {
Expand Down