-
Notifications
You must be signed in to change notification settings - Fork 707
/
fileio.py
126 lines (91 loc) · 4.17 KB
/
fileio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pluggable file I/O interface for use in TFX system and components."""
from typing import Any, Callable, Iterable, List, Optional, Tuple, Type
from tfx.dsl.io import filesystem
from tfx.dsl.io import filesystem_registry
from tfx.dsl.io.filesystem import PathType
# Import modules that may provide filesystem plugins.
import tfx.dsl.io.plugins.tensorflow_gfile # pylint: disable=unused-import, g-import-not-at-top
import tfx.dsl.io.plugins.local # pylint: disable=unused-import, g-import-not-at-top
# Expose `NotFoundError` as `fileio.NotFoundError`.
NotFoundError = filesystem.NotFoundError
def _get_filesystem(path) -> Type[filesystem.Filesystem]:
return (filesystem_registry.DEFAULT_FILESYSTEM_REGISTRY
.get_filesystem_for_path(path))
def open(path: PathType, mode: str = 'r'): # pylint: disable=redefined-builtin
"""Open a file at the given path."""
return _get_filesystem(path).open(path, mode=mode)
def copy(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.copy(src, dst, overwrite=overwrite)
else:
if not overwrite and exists(dst):
raise OSError(
('Destination file %r already exists and argument `overwrite` is '
'false.') % dst)
with open(src, mode='rb') as f_src:
contents = f_src.read()
with open(dst, mode='wb') as f_dst:
f_dst.write(contents)
def exists(path: PathType) -> bool:
"""Return whether a path exists."""
return _get_filesystem(path).exists(path)
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return _get_filesystem(pattern).glob(pattern)
def isdir(path: PathType) -> bool:
"""Return whether a path is a directory."""
return _get_filesystem(path).isdir(path)
def listdir(path: PathType) -> List[PathType]:
"""Return the list of files in a directory."""
return _get_filesystem(path).listdir(path)
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
_get_filesystem(path).makedirs(path)
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
_get_filesystem(path).mkdir(path)
def remove(path: PathType) -> None:
"""Remove the file at the given path."""
_get_filesystem(path).remove(path)
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename a source file to a destination path."""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.rename(src, dst, overwrite=overwrite)
else:
raise NotImplementedError(
('Rename from %r to %r using different filesystems plugins is '
'currently not supported.') % (src, dst))
def rmtree(path: PathType) -> None:
"""Remove the given directory and its recursive contents."""
_get_filesystem(path).rmtree(path)
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return _get_filesystem(path).stat(path)
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator walking a directory tree."""
return _get_filesystem(top).walk(top, topdown=topdown, >
def get_inline_filename(data: str, compressed: bool = False) -> str:
"""Return a path for an inline file with the given content."""
return _get_filesystem('/inline').get_inline_filename(data, compressed)