[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make usages of Eigen::array compatible with std::array. #64646

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tensorflow/core/distributed_runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ tf_cuda_cc_test(
"//tensorflow/core/kernels:identity_op",
"//tensorflow/core/kernels:variable_ops",
"//tensorflow/core/protobuf:master_proto_cc",
"@eigen_archive//:eigen3",
] + tf_grpc_cc_dependencies(),
)

Expand Down
6 changes: 3 additions & 3 deletions tensorflow/core/distributed_runtime/master_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ limitations under the License.
#include <memory>

#include "grpcpp/grpcpp.h"

#include "Eigen/Core" // from @eigen_archive
#include "tensorflow/core/distributed_runtime/rpc/grpc_channel.h"
#include "tensorflow/core/distributed_runtime/rpc/grpc_master_service_impl.h"
#include "tensorflow/core/distributed_runtime/rpc/grpc_testlib.h"
Expand Down Expand Up @@ -389,8 +389,8 @@ TEST_F(MasterTest, EigenProblem) {
TF_CHECK_OK(CreateSession(def, &handle, &initial_version));

// Temps supporting the computation of the convergence condition.
const Eigen::array<Eigen::DenseIndex, 1> sum_along_dim(0);
const Eigen::array<Eigen::DenseIndex, 2> matrix_transpose({1, 0});
const Eigen::array<Eigen::DenseIndex, 1> sum_along_dim{0};
const Eigen::array<Eigen::DenseIndex, 2> matrix_transpose{1, 0};
Tensor x(DT_FLOAT, TensorShape({2, 1}));
Tensor y(DT_FLOAT, TensorShape({2, 1}));
Eigen::Tensor<float, 1, Eigen::RowMajor> y_square_sum;
Expand Down
11 changes: 7 additions & 4 deletions tensorflow/core/kernels/gather_nd_op_gpu.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ __global__ void GatherSliceOpKernel(
const auto indices_i = indices + IXDIM * loc;
bool out_of_bounds = false;
Index offset = 0;
// Avoid empty std::array access, which fails to compile on GPU.
if constexpr (IXDIM > 0) {
#pragma unroll
for (int j = 0; j < IXDIM; ++j) {
const Index index_j = ldg(indices_i + j);
out_of_bounds |= !FastBoundsCheck(index_j, batch_indices[j]);
offset += batch_strides[j] * index_j;
for (int j = 0; j < IXDIM; ++j) {
const Index index_j = ldg(indices_i + j);
out_of_bounds |= !FastBoundsCheck(index_j, batch_indices[j]);
offset += batch_strides[j] * index_j;
}
}
// TODO(ebrevdo):
// This is the only part that depends on the offset. The part
Expand Down
40 changes: 23 additions & 17 deletions tensorflow/core/kernels/image/adjust_contrast_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,18 @@ class AdjustContrastOpv2<CPUDevice, float> : public AdjustContrastOpV2Base {
TTypes<float, 1>::Tensor mean_flat(&mean(0, 0), mean.size());
TTypes<float, 1>::Tensor summation_scratch(&scratch(0, 0, 0),
scratch.size());
using Eigen::DenseIndex;
typedef Eigen::array<Eigen::DenseIndex, 1> Index;
const int64_t plane_size = image_size * channels;
// Since the number of channels in the early layers is often small, a
// straightforward loop for summing cannot utilize vectorization.
// This algorithm repeatedly folds each image plane by half, until
// only one set of channels remains.
for (int64_t i = 0; i < batch; i++) {
auto input_plane =
input_flat.slice(Index(i * plane_size), Index(plane_size));
auto summation_plane =
summation_scratch.slice(Index(i * plane_size), Index(plane_size));
auto input_plane = input_flat.slice(Index{DenseIndex(i * plane_size)},
Index{DenseIndex(plane_size)});
auto summation_plane = summation_scratch.slice(
Index{DenseIndex(i * plane_size)}, Index{DenseIndex(plane_size)});
int64_t remaining_size = image_size;
int round = 0;
// Sum the input(i, :, k) into mean(i, k). Repeatedly splits the input
Expand Down Expand Up @@ -289,36 +290,41 @@ class AdjustContrastOpv2<CPUDevice, float> : public AdjustContrastOpV2Base {
if (round == 0) {
// In the first round, sum the left side and right side of the input
// array into the summation area.
summation_plane.slice(Index(0), Index(right_size * channels)) =
input_plane.slice(Index(left_size * channels),
Index(right_size * channels)) +
input_plane.slice(Index(0), Index(right_size * channels));
summation_plane.slice(Index{0},
Index{DenseIndex(right_size * channels)}) =
input_plane.slice(Index{DenseIndex(left_size * channels)},
Index{DenseIndex(right_size * channels)}) +
input_plane.slice(Index{0},
Index{DenseIndex(right_size * channels)});
if (left_size > right_size) {
DCHECK_EQ(left_size - right_size, 1);
// Copy over the remaining column if the remaining_size is odd.
// This also handles the case where image_size == 1.
summation_plane.slice(Index(right_size * channels),
Index(channels)) =
input_plane.slice(Index(right_size * channels),
Index(channels));
summation_plane.slice(Index{DenseIndex(right_size * channels)},
Index{DenseIndex(channels)}) =
input_plane.slice(Index{DenseIndex(right_size * channels)},
Index{DenseIndex(channels)});
}
} else {
// For all the remaining rounds, add the second half of the inputs
// into the first half of the inputs. With the flat structure and
// large size, this utilizes vectorization between components.
summation_plane.slice(Index(0), Index(right_size * channels)) +=
summation_plane.slice(Index(left_size * channels),
Index(right_size * channels));
summation_plane.slice(Index{0},
Index{DenseIndex(right_size * channels)}) +=
summation_plane.slice(Index{DenseIndex(left_size * channels)},
Index{DenseIndex(right_size * channels)});
}
remaining_size = left_size;
round++;
} while (remaining_size > 1);
const float mean_scaling = 1.0f / image_size;
// The first channels elements in summation_plane now holds the summation.
// Scale it with image_size and copy over to the means.
auto mean_plane = mean_flat.slice(Index(i * channels), Index(channels));
auto mean_plane = mean_flat.slice(Index{DenseIndex(i * channels)},
Index{DenseIndex(channels)});
mean_plane =
summation_plane.slice(Index(0), Index(channels)) * mean_scaling;
summation_plane.slice(Index{0}, Index{DenseIndex(channels)}) *
mean_scaling;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ __global__ void __launch_bounds__(1024)
(normMax >= T(0.))) ||
((normMax > kStdDevsInsideBoundsToUseRandnSampler) &&
(normMin <= T(0.)))) {
Eigen::array<T, 4> n;

int numIterations = 0;
while (numIterations < kMaxIterations) {
const auto randn = normal_dist(&gen);
Expand Down
2 changes: 0 additions & 2 deletions tensorflow/core/kernels/random_binomial_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ struct RandomBinomialFunctor<CPUDevice, T, U> {
&gen, &output](int64_t start_output, int64_t limit_output) {
// Vectorized intermediate calculations for uniform rejection sampling.
// We always generate at most 4 samples.
Eigen::array<T, 4> z;
Eigen::array<T, 4> g;
const bool should_bcast = bcast.IsBroadcastingRequired();
const auto& counts_batch_indices = bcast.x_batch_indices();
const auto& probs_batch_indices = bcast.y_batch_indices();
Expand Down
3 changes: 2 additions & 1 deletion tensorflow/core/kernels/sparse_tensor_dense_matmul_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.

#include "tensorflow/core/kernels/sparse_tensor_dense_matmul_op.h"

#include "Eigen/Core" // from @eigen_archive
#include "tensorflow/core/framework/bounds_check.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/op_kernel.h"
Expand Down Expand Up @@ -310,7 +311,7 @@ Status SparseTensorDenseMatMulImpl(
if (ADJ_B) {
// Perform transpose and conjugation on B once, since we chip out B's
// columns in the nnz loop.
Eigen::array<int, 2> shuffle(1, 0); // preserve dimension order
Eigen::array<int, 2> shuffle{1, 0}; // preserve dimension order
Eigen::Tensor<T, 2, Eigen::ColMajor> col_major_conj_b =
b.swap_layout().shuffle(shuffle).conjugate();
LOOP_NNZ(col_major_conj_b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,6 @@ static void PackLhsHelper(::testing::benchmark::State& state,
reshape_dims[0] = filter_count;
reshape_dims[1] = input_depth * filter_rows * filter_cols;

// We are going to contract along the 'in_depth * filter_rows * filter_cols`.
nocontract_t nocontract_dim = {0};
contract_t contract_dim = {1};

// These values computed using the algorithm in TensorContraction.h, with
// 'nocontract_dim' and 'contract_dim' values specified above.
nocontract_t nocontract_strides = {1};
Expand Down
1 change: 1 addition & 0 deletions third_party/xla/xla/pjrt/gpu/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ cc_library(
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:inlined_vector",
"@com_google_absl//absl/functional:any_invocable",
"@com_google_absl//absl/functional:bind_front",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/memory",
Expand Down
183 changes: 114 additions & 69 deletions third_party/xla/xla/pjrt/gpu/se_gpu_pjrt_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ limitations under the License.
#include "absl/container/flat_hash_map.h"
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/functional/bind_front.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
Expand Down Expand Up @@ -504,97 +505,141 @@ StreamExecutorGpuClient::GetDefaultDeviceAssignment(int num_replicas,
}

PjRtFuture<> StreamExecutorGpuClient::CopyRawSubBufferToHost(
PjRtBuffer* pjrt_buffer, void* dst, int64_t offset, int64_t transfer_size) {
PjRtBuffer* pjrt_buffer, PjRtFuture<void*> dst, int64_t offset,
int64_t transfer_size) {
auto* buffer = tensorflow::down_cast<PjRtStreamExecutorBuffer*>(pjrt_buffer);
DCHECK(buffer);
PjRtStreamExecutorDevice* device = buffer->device();
LocalDeviceState* local_device = device->local_device_state();
se::Stream* stream = local_device->GetDeviceToHostStream();

// Acquire the usage hold inline so that the buffer is kept alive even if
// `dst` is not immediately available.
PjRtStreamExecutorBuffer::ScopedHold hold(buffer->GetBufferWithUsageHold());
if (!hold.ok()) {
return PjRtFuture<>(hold.status());
}

auto device_buffer = hold.buffer();
if (device_buffer->device_memory().size() != 1) {
return PjRtFuture<>(InvalidArgument("Copy raw buffer called on tuple"));
}
auto& device_memory = device_buffer->device_memory()[0];
if (offset < 0 || offset > device_memory.size() ||
device_memory.size() - offset < transfer_size) {
return PjRtFuture<>(
InvalidArgument("Copy raw buffer called on buffer size %lld with "
"invalid offset %lld, transfer size %lld",
device_memory.size(), offset, transfer_size));
}
WaitForBufferDefinitionEventsOnStream(*device_buffer, stream);
absl::StatusOr<EventPool::Handle> event_or =
local_device->event_pool().AllocateEvent(stream->parent());
if (!event_or.ok()) {
return PjRtFuture<>(event_or.status());
}

std::unique_ptr<se::DeviceMemoryBase> sub_buffer;
if (transfer_size < device_memory.size()) {
sub_buffer = std::make_unique<se::DeviceMemoryBase>(
device_memory.GetByteSlice(offset, transfer_size));
} else {
sub_buffer = std::make_unique<se::DeviceMemoryBase>(device_memory);
}
auto promise = PjRtFuture<>::CreatePromise();
auto usage_event =
std::make_shared<BufferSequencingEvent>(this->thread_pool());

if (transfer_size != 0) {
if (should_stage_host_to_device_transfers()) {
if (host_memory_allocator() == nullptr) {
return PjRtFuture<>(InvalidArgument(
"host_memory_allocator should be initialized for staging buffer "
"transfer."));
}
void* ptr = host_memory_allocator()->AllocateRaw(
tsl::Allocator::kAllocatorAlignment, transfer_size);
// When using the ComputeSynchronized allocation model, retain a reference to
// the device_buffer until the copy completes, to ensure that the buffer isn't
// deleted or donated while it is still in use. The choice of retaining a
// reference at the host is a heuristic; the alternative is to ensure, before
// freeing the buffer, that the compute stream is synchronized past the
// transfer, but it seems better to hold onto the buffer too long than to
// stall the compute stream.
hold.ConvertUsageHold(stream, usage_event, /*reference_held=*/true);

auto async_copy = [this, promise, offset, transfer_size, stream, local_device,
device_buffer, usage_event = std::move(usage_event)](
absl::StatusOr<void*> dst) mutable {
absl::StatusOr<EventPool::Handle> event =
local_device->event_pool().AllocateEvent(stream->parent());
if (!event.ok()) {
promise.Set(event.status());
return;
}

std::shared_ptr<void> staging_buffer = std::shared_ptr<void>(
ptr, [host_memory_allocator = host_memory_allocator()](void* ptr) {
host_memory_allocator->DeallocateRaw(ptr);
});
if (auto status =
stream->Memcpy(staging_buffer.get(), *sub_buffer, transfer_size);
!status.ok()) {
return PjRtFuture<>(status);
}
auto copy_to_staging_buffer = [dst, transfer_size,
staging_buffer]() mutable {
std::memcpy(dst, staging_buffer.get(), transfer_size);
};
if (auto status = stream->DoHostCallback(copy_to_staging_buffer);
!status.ok()) {
return PjRtFuture<>(status);
}
absl::Status defined_status =
device_buffer->definition_events()[0]->GetDefinedStatus();
if (!defined_status.ok()) {
promise.Set(defined_status);
return;
}

auto& device_memory = device_buffer->device_memory()[0];
if (offset < 0 || offset > device_memory.size() ||
device_memory.size() - offset < transfer_size) {
promise.Set(
InvalidArgument("Copy raw buffer called on buffer size %lld with "
"invalid offset %lld, transfer size %lld",
device_memory.size(), offset, transfer_size));
return;
}

std::unique_ptr<se::DeviceMemoryBase> sub_buffer;
if (transfer_size < device_memory.size()) {
sub_buffer = std::make_unique<se::DeviceMemoryBase>(
device_memory.GetByteSlice(offset, transfer_size));
} else {
// D2H request holds a non-owned pointer into sub_buffer base address
// that needs to outlive the transfer until the stream callback is
// invoked.
auto status = stream->Memcpy(dst, *sub_buffer, transfer_size);
if (!status.ok()) {
return PjRtFuture<>(status);
sub_buffer = std::make_unique<se::DeviceMemoryBase>(device_memory);
}

WaitForBufferDefinitionEventsOnStream(*device_buffer, stream);

if (transfer_size != 0) {
if (should_stage_host_to_device_transfers()) {
if (host_memory_allocator() == nullptr) {
promise.Set(InvalidArgument(
"host_memory_allocator should be initialized for staging buffer "
"transfer."));
return;
}
void* ptr = host_memory_allocator()->AllocateRaw(
tsl::Allocator::kAllocatorAlignment, transfer_size);

std::shared_ptr<void> staging_buffer = std::shared_ptr<void>(
ptr, [host_memory_allocator = host_memory_allocator()](void* ptr) {
host_memory_allocator->DeallocateRaw(ptr);
});
if (auto status = stream->Memcpy(staging_buffer.get(), *sub_buffer,
transfer_size);
!status.ok()) {
promise.Set(std::move(status));
return;
}
auto copy_to_staging_buffer = [dst, transfer_size,
staging_buffer]() mutable {
std::memcpy(*dst, staging_buffer.get(), transfer_size);
};
if (auto status = stream->DoHostCallback(copy_to_staging_buffer);
!status.ok()) {
promise.Set(std::move(status));
return;
}
} else {
// D2H request holds a non-owned pointer into sub_buffer base address
// that needs to outlive the transfer until the stream callback is
// invoked.
auto status = stream->Memcpy(*dst, *sub_buffer, transfer_size);
if (!status.ok()) {
promise.Set(std::move(status));
return;
}
}
}
}

auto usage_event =
std::make_shared<BufferSequencingEvent>(this->thread_pool());
local_device->event_pool().ThenRecordEvent(stream, event_or.value());
usage_event->SetSequencingEvent(std::move(event_or).value(), stream);
// This usage hold will prevent device_buffer from being deleted before
// the transfer is complete.
hold.ConvertUsageHold(stream, std::move(usage_event),
/*reference_held=*/false);
local_device->event_pool().ThenRecordEvent(stream, event.value());
usage_event->SetSequencingEvent(std::move(event).value(), stream);

auto promise = PjRtFuture<>::CreatePromise();
auto callback_status = local_device->ThenExecuteCallback(
stream, [promise]() mutable { promise.Set(); });
if (!callback_status.ok()) {
return PjRtFuture<>(callback_status);
}
auto callback_status = local_device->ThenExecuteCallback(
stream, [promise, device_buffer = std::move(device_buffer)]() mutable {
promise.Set();
});
if (!callback_status.ok()) {
promise.Set(std::move(callback_status));
return;
}
};

device_buffer->definition_events()[0]->ExecuteOrAddToFutureTasks(
absl::StrFormat("async_copy_raw_sub_buffer_to_host_%p", &async_copy),
[this, dst, async_copy = std::move(async_copy)]() mutable {
dst.OnReady([this, async_copy = std::move(async_copy)](
absl::StatusOr<void*> dst) {
// Trampoline through a thread pool since GPUs do not allow calling
// D2H inside the callback's context.
thread_pool()->Schedule(absl::bind_front(async_copy, std::move(dst)));
});
});

return PjRtFuture<>(
std::move(promise),
Expand Down
Loading
Loading