[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tensorflow-IO DAOS Plugin #1603

Open
wants to merge 91 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
7369e32
Skeleton in Place + Build Correctly
Sep 6, 2021
b055a0c
Merge branch 'FT-dfs-skeleton-OM' into 'devel'
omar-hmarzouk Sep 10, 2021
230aadd
Parsing Function Added and Tested Separately
Sep 10, 2021
a9b128b
Merge branch '2-parse-dfs-path' into 'devel'
omar-hmarzouk Sep 10, 2021
17a21d7
DAOS library installed as an http archive and linked
Sep 12, 2021
20900e9
Removed DAOS API call test
Sep 12, 2021
c1970e9
Linking DAOS Shared Libraries
Sep 12, 2021
6441bf0
Merge branch 'FT-daos-lib-integration-OM' into 'devel'
omar-hmarzouk Sep 12, 2021
b1c0128
Added Skeleton + Connect/Disconnect Functionality
Sep 15, 2021
e3654b9
Merge branch '4-plugin-skeleton' into 'devel'
omar-hmarzouk Sep 15, 2021
a45d6e4
Init+Cleanup+Mount+Unmount
Sep 15, 2021
903b89f
Query + Moving Class and Helpers to header file
Sep 16, 2021
477e81a
Added Path Lookup Functionality
Sep 20, 2021
ae6f84a
Support for Multiple Connections
Sep 21, 2021
9a3f111
Merge branch 'FT-filesystem-ops-OM' into 'devel'
omar-hmarzouk Sep 21, 2021
9f29eee
Directory Checking + Creation & Deletion (Singl/Recursive)
Sep 23, 2021
14073c5
Merge branch 'FT-directory-operation-OM' into 'devel'
omar-hmarzouk Sep 23, 2021
8dd9fbb
File Size Calculation
Sep 23, 2021
c8e261a
File Deletion
Sep 23, 2021
958b645
Creation of Random Access + Writable + Appendable Operations
Sep 25, 2021
96647da
Rename/Moving of File
Sep 26, 2021
038dfce
Completed FileSystem Operations Table
Sep 27, 2021
2f29df0
Merge branch 'FT-file-ops-OM' into 'devel'
omar-hmarzouk Sep 27, 2021
5b2d864
Refactor
Sep 28, 2021
b14fc84
Merge branch '10-refactor-of-dfs-plugin-class' into 'devel'
omar-hmarzouk Sep 28, 2021
10b09de
Writable File Ops Done
Sep 29, 2021
26b0f36
Merge branch 'FT-writable-file-ops-OM' into 'devel'
omar-hmarzouk Sep 29, 2021
83fd151
Random Access File Ops Done
Sep 29, 2021
6810f04
Merge branch 'FT-random-access-ops-OM' into 'devel'
omar-hmarzouk Sep 29, 2021
ec328c1
Tests Added (Bug in isDirectory and Wildcard Matching)
Oct 6, 2021
cadd6f6
Tests completed & passed & wildcard matching to be checked
Oct 10, 2021
dc664ae
Added Tutorial
Oct 14, 2021
366d63c
Tutorial tested and configured
Oct 14, 2021
8a8cabf
Implementation of Wildcard Matching
Oct 19, 2021
3614d0c
Merge branch '13-wildcard-matching' into FT-tutorial-example-OM
Oct 19, 2021
866ebd7
Merge branch '13-wildcard-matching' into 'devel'
omar-hmarzouk Oct 19, 2021
36e93cf
Merge branch 'FT-tutorial-example-OM' into 'devel'
omar-hmarzouk Oct 19, 2021
6ee86d0
Added Dummy ROM-Region
Oct 27, 2021
3745d4f
Merge branch 'FT-rom-region-dummy-OM' into 'devel'
omar-hmarzouk Oct 27, 2021
c0ba4c4
Update to DAOS1.3.106 + Decoupling of DAOS API init + Handling Pool a…
Dec 5, 2021
a764fdf
Adjusted Example + Added Build Documentation + Fixed Indentation
Dec 5, 2021
d5016e5
Refactor + Update Tests + Update Docs
Dec 6, 2021
eebe0af
Markdown updated
MernaMoawad Dec 6, 2021
9b3baf0
UNS NO_CHECK_PATH
Dec 6, 2021
7035255
Linking DAOS
Dec 7, 2021
7c1c736
Linking DAOS libraries
Dec 7, 2021
9d72efd
Updated Docs
Dec 8, 2021
b823fa1
Merge branch 'devel' of https://github.com/daos-stack/tensorflow-io-d…
Dec 8, 2021
a2cc29e
Updating Docs and moving them to docs/
Dec 8, 2021
4a8b3ec
Updated Docs
Dec 10, 2021
ba9074f
Merge branch 'devel' into FT-unified-name-space-OM
Dec 10, 2021
ea94561
UNS Supports UID and Label
Dec 12, 2021
a8f0a85
Merge branch 'FT-unified-name-space-OM' into 'devel'
omar-hmarzouk Dec 12, 2021
efbb408
Updated Tutorial + Documentation
Dec 15, 2021
5027ff7
Updated Notebook
Dec 15, 2021
62b092c
Cleared Output
Dec 15, 2021
c83c378
Output Cleared
Dec 15, 2021
b827a7b
Style and Formatting
Dec 16, 2021
de0a601
Merge branch 'tensorflow:master' into devel
omar-hmarzouk Jan 2, 2022
7840b77
PyLint modifications
Jan 4, 2022
9987dd8
Merge branch 'tensorflow:master' into devel
omar-hmarzouk Jan 4, 2022
df84926
Bazel lint
Jan 5, 2022
5ef973c
Merge branch 'devel' of https://github.com/daos-stack/tensorflow-io-d…
Jan 5, 2022
24641ce
Integrating daos build changes
Jan 30, 2022
673ffba
Linting
Jan 30, 2022
0cf005c
Replacing usage of C++ API
Jan 30, 2022
c6dab2a
Formatted DAOS notebook
Feb 2, 2022
5cf7335
Pool Connection Error Handling
Feb 22, 2022
0858a52
Synchronous Read Ahead
Mar 8, 2022
9689f99
Asynchronous read ahed
Mar 10, 2022
db74d80
Merge remote-tracking branch 'original-repo/master' into 17-read-ahea…
Apr 2, 2022
7488654
Finalize Read Ahead
Apr 3, 2022
dbf0bce
Linting
Apr 4, 2022
57ebc1e
Merge branch '17-read-ahead-buffering' into 'devel'
omar-hmarzouk Apr 4, 2022
e8cf9c0
Linting Merge
Apr 4, 2022
e3887d5
Existing File Deletion when Opened in Write Mode
May 6, 2022
3fd58df
Removing Comments
May 6, 2022
7a22a32
Read Ahead Bug Fixes
May 8, 2022
afbc18b
Event Queue De-Centralization
May 10, 2022
79b9d18
Bug Fix
May 10, 2022
fab7f15
Various fixes to the DAOS tensorflow-io plugin. (#2)
krehm Jun 5, 2022
6746017
Merge branch 'tensorflow:master' into devel
omar-hmarzouk Jun 5, 2022
b0d5ad2
Linting
Jun 5, 2022
1af1181
Optimizations Added
Jun 6, 2022
b0a7b7d
Adjustments to Reading, Single Event Queue Handle, Paths Caching, and…
Jun 6, 2022
eb90e69
Add support for dynamically loaded DAOS libraries (#4)
krehm Jun 22, 2022
5f54aee
Linting
Jun 22, 2022
89c39e3
Various additional plugin fixes
krehm Jul 12, 2022
c53f6a0
Various additional plugin fixes (#6)
krehm Jul 25, 2022
66a764a
Adding Patches for Linting and skipping Windows & macOS Build
omar-hmarzouk Jul 26, 2022
83596c8
Merging Upstream
omar-hmarzouk Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Synchronous Read Ahead
  • Loading branch information
Omar Marzouk committed Mar 8, 2022
commit 0858a5278434bc73735e99704a6ba76951856850
69 changes: 49 additions & 20 deletions tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,75 @@ typedef struct DFSRandomAccessFile {
std::string dfs_path;
dfs_t* daos_fs;
DAOS_FILE daos_file;
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
std::vector<ReadBuffer> buffers;
daos_size_t file_size;
DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, daos_handle_t eqh,
dfs_obj_t* obj, size_t num_of_buffers)
: dfs_path(std::move(dfs_path)) {
daos_fs = file_system;
daos_file.file = obj;
dfs_get_size(daos_fs, obj, &file_size);
for(size_t i = 0; i < num_of_buffers; i++) {
buffers.push_back(ReadBuffer(eqh, BUFF_SIZE));
}
}
} DFSRandomAccessFile;

void Cleanup(TF_RandomAccessFile* file) {
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
for(auto& read_buf: dfs_file->buffers) {
read_buf.AbortEvent();
}
dfs_release(dfs_file->daos_file.file);
dfs_file->daos_fs = nullptr;
delete dfs_file;
}

int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
char* buffer, TF_Status* status) {
d_sg_list_t rsgl;
d_iov_t iov;
int rc;
char* ret, TF_Status* status) {
int rc = 0;
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
int counter = 0;
for(auto& read_buf: dfs_file->buffers) {
if(read_buf.CacheHit(offset, n)){
rc = read_buf.CopyData(ret, offset, n);
if (rc) {
TF_SetStatus(status, TF_INTERNAL, "");
return 0;
}

if (offset + n > dfs_file->file_size) {
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
return dfs_file->file_size - offset;
}

TF_SetStatus(status, TF_OK, "");
return n;
}
counter++;
}

d_iov_set(&iov, (void*)buffer, n);
rsgl.sg_nr = 1;
rsgl.sg_iovs = &iov;

daos_size_t read_size;
dfs_file->daos_file.offset = offset;
dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, offset);
rc = dfs_file->buffers[0].CopyData(ret, offset, n);
size_t curr_offset = offset + BUFF_SIZE;
for(size_t i = 1; i < dfs_file->buffers.size(); i++) {
if(curr_offset > dfs_file->file_size) break;
dfs_file->buffers[i].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file, curr_offset);
curr_offset += BUFF_SIZE;
}

rc = dfs_read(dfs_file->daos_fs, dfs_file->daos_file.file, &rsgl,
dfs_file->daos_file.offset, &read_size, NULL);
if (rc) {
TF_SetStatus(status, TF_INTERNAL, "");
return read_size;
return 0;
}

if (read_size != n) {
if (offset + n > dfs_file->file_size) {
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
return read_size;
return dfs_file->file_size - offset;
}

TF_SetStatus(status, TF_OK, "");
return read_size;
return n;
}

} // namespace tf_random_access_file
Expand Down Expand Up @@ -187,8 +214,10 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
return;
}
file->plugin_file =
new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, obj);
auto random_access_file = new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, daos->mEventQueueHandle,
obj, NUM_OF_BUFFERS);
random_access_file->buffers[0].ReadSync(daos->daos_fs, random_access_file->daos_file.file, 0);
file->plugin_file = random_access_file;
TF_SetStatus(status, TF_OK, "");
}

Expand Down Expand Up @@ -666,4 +695,4 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) {

} // namespace dfs
} // namespace io
} // namespace tensorflow
} // namespace tensorflow
131 changes: 129 additions & 2 deletions tensorflow_io/core/filesystems/dfs/dfs_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ DFS::DFS() {
is_initialized = false;
}

DFS::~DFS() { free(daos_fs); }
DFS::~DFS() {
free(daos_fs);
}

DFS* DFS::Load() {
if (!is_initialized) {
Expand All @@ -73,7 +75,11 @@ DFS* DFS::Load() {
return this;
}

int DFS::dfsInit() { return daos_init(); }
int DFS::dfsInit() {
int rc = daos_init();
if(rc) return rc;
return daos_eq_create(&mEventQueueHandle);
}

void DFS::dfsCleanup() {
Teardown();
Expand Down Expand Up @@ -102,6 +108,12 @@ int DFS::Setup(const std::string& path, std::string& pool_string,
}

void DFS::Teardown() {
daos_event_t* temp_event;
int ret;
do {
ret = daos_eq_poll(mEventQueueHandle, 1, -1, 1, &(temp_event));
} while(ret == 1);
daos_eq_destroy(mEventQueueHandle, 0);
Unmount();
ClearConnections();
}
Expand Down Expand Up @@ -412,3 +424,118 @@ int DFS::DisconnectContainer(std::string pool_string, std::string cont_string) {
}
return rc;
}

ReadBuffer::ReadBuffer(daos_handle_t eqh, size_t size): buffer_size(size), eqh(eqh) {
buffer = new char[size];
buffer_offset = 0;
event = new daos_event_t;
daos_event_init(event, eqh,nullptr);
valid = false;
}

ReadBuffer::~ReadBuffer() {
if(event != nullptr) {
bool event_status;
daos_event_test(event, 0, &event_status);
daos_event_fini(event);
}
delete [] buffer;
delete event;
}

ReadBuffer::ReadBuffer(ReadBuffer&& read_buffer) {
eqh = read_buffer.eqh;
buffer_size = read_buffer.buffer_size;
buffer = std::move(read_buffer.buffer);
event = std::move(read_buffer.event);
buffer_offset = 0;
valid = false;
read_buffer.buffer = nullptr;
read_buffer.event = nullptr;
}

bool
ReadBuffer::CacheHit(size_t pos, size_t len) {
return pos >= buffer_offset && len < buffer_size && (pos+len <= buffer_offset + buffer_size);
}

int
ReadBuffer::WaitEvent() {
if(valid) return 0;
bool event_status;
daos_event_test(event, -1, &event_status);
if(event_status) {
return 0;
}
return -1;
}

int
ReadBuffer::AbortEvent() {
bool event_status = false;
daos_event_test(event, 0, &event_status);
if(!event_status)
return daos_event_abort(event);
else
return 0;
}

int
ReadBuffer::ReadAsync(dfs_t* daos_fs, dfs_obj_t* file, size_t off) {
int rc = AbortEvent();
if(rc) return rc;
d_sg_list_t rsgl;
d_iov_t iov;
d_iov_set(&iov, (void*)buffer, buffer_size);
rsgl.sg_nr = 1;
rsgl.sg_iovs = &iov;
daos_size_t read_size;
valid = false;
buffer_offset = off;
dfs_read(daos_fs, file, &rsgl,
buffer_offset, &read_size, event);
}

int
ReadBuffer::ReadSync(dfs_t* daos_fs, dfs_obj_t* file, size_t off) {
int rc = AbortEvent();
if(rc) return rc;
d_sg_list_t rsgl;
d_iov_t iov;
d_iov_set(&iov, (void*)buffer, buffer_size);
rsgl.sg_nr = 1;
rsgl.sg_iovs = &iov;
daos_size_t read_size;
valid = false;
buffer_offset = off;
rc = dfs_read(daos_fs, file, &rsgl,
off, &read_size, NULL);
valid = true;
return rc;
}

int
ReadBuffer::CopyData(char* ret, size_t off, size_t n) {
int rc = WaitEvent();
if(rc) return rc;
memcpy(ret, buffer + (off - buffer_offset), n);
return 0;
}

int
ReadBuffer::CopyFromCache(char* ret, size_t off, size_t n, daos_size_t file_size, TF_Status* status){
int rc = CopyData(ret, off, n);
if (rc) {
TF_SetStatus(status, TF_INTERNAL, "");
return 0;
}

if (off + n > file_size) {
TF_SetStatus(status, TF_OUT_OF_RANGE, "");
return file_size - off;
}

TF_SetStatus(status, TF_OK, "");
return n;
}

45 changes: 44 additions & 1 deletion tensorflow_io/core/filesystems/dfs/dfs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#define CONT_START 43
#define PATH_START 80
#define STACK 24
#define NUM_OF_BUFFERS 2
#define BUFF_SIZE 10*1024*1024

#include <daos.h>
#include <daos_fs.h>
Expand Down Expand Up @@ -136,13 +138,14 @@ int ParseDFSPath(const std::string& path, std::string& pool_string,

int ParseUUID(const std::string& str, uuid_t uuid);

class DFS {
class DFS {
public:
bool connected;
dfs_t* daos_fs;
id_handle_t pool;
id_handle_t container;
std::map<std::string, pool_info_t*> pools;
daos_handle_t mEventQueueHandle{};

DFS();

Expand Down Expand Up @@ -200,4 +203,44 @@ class DFS {

void CopyEntries(char*** entries, std::vector<std::string>& results);


class ReadBuffer {
public:
ReadBuffer(daos_handle_t eqh, size_t size);

ReadBuffer(ReadBuffer&&);

~ReadBuffer();

bool
CacheHit(size_t pos, size_t off);

int
WaitEvent();

int
AbortEvent();

int
ReadAsync(dfs_t* dfs, dfs_obj_t* file, size_t off);

int
ReadSync(dfs_t* dfs, dfs_obj_t* file, size_t off);

int
CopyData(char* ret, size_t off, size_t n);

int
CopyFromCache(char* ret, size_t off, size_t n, daos_size_t file_size, TF_Status* status);

private:
char* buffer;
size_t buffer_offset;
size_t buffer_size;
daos_handle_t eqh;
daos_event_t* event;
bool valid;
daos_size_t read_size;
};

#endif // TENSORFLOW_IO_CORE_FILESYSTEMS_DFS_DFS_FILESYSTEM_H_