Coverage for src/bob/pipelines/dataset/protocols/archive.py: 43%

84 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-12 21:32 +0200

1"""Archives (tar, zip) operations like searching for files and extracting.""" 

2 

3import bz2 

4import io 

5import logging 

6import os 

7import tarfile 

8import zipfile 

9 

10from fnmatch import fnmatch 

11from pathlib import Path 

12from typing import IO, TextIO, Union 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17def path_and_subdir( 

18 archive_path: Union[str, os.PathLike], 

19) -> tuple[Path, Union[Path, None]]: 

20 """Splits an archive's path from a sub directory (separated by ``:``).""" 

21 archive_path_str = Path(archive_path).as_posix() 

22 if ":" in archive_path_str: 

23 archive, sub_dir = archive_path_str.rsplit(":", 1) 

24 return Path(archive), Path(sub_dir) 

25 return Path(archive_path), None 

26 

27 

28def _is_bz2(path: Union[str, os.PathLike]) -> bool: 

29 try: 

30 with bz2.BZ2File(path) as f: 

31 f.read(1024) 

32 return True 

33 except (OSError, EOFError): 

34 return False 

35 

36 

37def is_archive(path: Union[str, os.PathLike]) -> bool: 

38 """Returns whether the path points in an archive. 

39 

40 Any path pointing to a valid tar or zip archive or to a valid bz2 

41 file will return ``True``. 

42 """ 

43 archive = path_and_subdir(path)[0] 

44 try: 

45 return any( 

46 tester(path_and_subdir(archive)[0]) 

47 for tester in (tarfile.is_tarfile, zipfile.is_zipfile, _is_bz2) 

48 ) 

49 except (FileNotFoundError, IsADirectoryError): 

50 return False 

51 

52 

53def search_and_open( 

54 search_pattern: str, 

55 archive_path: Union[str, os.PathLike], 

56 inner_dir: Union[os.PathLike, None] = None, 

57 open_as_binary: bool = False, 

58) -> Union[IO[bytes], TextIO, None]: 

59 """Returns a read-only stream of a file matching a pattern in an archive. 

60 

61 Wildcards (``*``, ``?``, and ``**``) are supported (using 

62 :meth:`pathlib.Path.glob`). 

63 

64 The first matching file will be open and returned. 

65 

66 examples: 

67 

68 .. code-block: text 

69 

70 archive.tar.gz 

71 + subdir1 

72 | + file1.txt 

73 | + file2.txt 

74 | 

75 + subdir2 

76 + file1.txt 

77 

78 ``search_and_open("archive.tar.gz", "file1.txt")`` 

79 opens``archive.tar.gz/subdir1/file1.txt`` 

80 

81 ``search_and_open("archive.tar.gz:subdir2", "file1.txt")`` 

82 opens ``archive.tar.gz/subdir2/file1.txt`` 

83 

84 ``search_and_open("archive.tar.gz", "*.txt")`` 

85 opens ``archive.tar.gz/subdir1/file1.txt`` 

86 

87 

88 Parameters 

89 ---------- 

90 archive_path 

91 The ``.tar.gz`` archive file containing the wanted file. To match 

92 ``search_pattern`` in a sub path in that archive, append the sub path 

93 to ``archive_path`` with a ``:`` (e.g. 

94 ``/path/to/archive.tar.gz:sub/dir/``). 

95 search_pattern 

96 A string to match to the file. Wildcards are supported (Unix pattern 

97 matching). 

98 

99 Returns 

100 ------- 

101 io.TextIOBase or io.BytesIO 

102 A read-only file stream. 

103 """ 

104 

105 archive_path = Path(archive_path) 

106 

107 if inner_dir is None: 

108 archive_path, inner_dir = path_and_subdir(archive_path) 

109 

110 if inner_dir is not None: 

111 pattern = (Path("/") / inner_dir / search_pattern).as_posix() 

112 else: 

113 pattern = (Path("/") / search_pattern).as_posix() 

114 

115 if ".tar" in archive_path.suffixes: 

116 tar_arch = tarfile.open(archive_path) # TODO File not closed 

117 for member in tar_arch: 

118 if member.isfile() and fnmatch("/" + member.name, pattern): 

119 break 

120 else: 

121 logger.debug( 

122 f"No file matching '{pattern}' were found in '{archive_path}'." 

123 ) 

124 return None 

125 

126 if open_as_binary: 

127 return tar_arch.extractfile(member) 

128 return io.TextIOWrapper(tar_arch.extractfile(member), encoding="utf-8") 

129 

130 elif archive_path.suffix == ".zip": 

131 zip_arch = zipfile.ZipFile(archive_path) 

132 for name in zip_arch.namelist(): 

133 if fnmatch("/" + name, pattern): 

134 break 

135 else: 

136 logger.debug( 

137 f"No file matching '{pattern}' were found in '{archive_path}'." 

138 ) 

139 return zip_arch.open(name) 

140 

141 raise ValueError( 

142 f"Unknown file extension '{''.join(archive_path.suffixes)}'" 

143 ) 

144 

145 

146def list_dirs( 

147 archive_path: Union[str, os.PathLike], 

148 inner_dir: Union[os.PathLike, None] = None, 

149 show_dirs: bool = True, 

150 show_files: bool = True, 

151) -> list[Path]: 

152 """Returns a list of all the elements in an archive or inner directory. 

153 

154 Parameters 

155 ---------- 

156 archive_path 

157 A path to an archive, or an inner directory of an archive (appended 

158 with a ``:``). 

159 inner_dir 

160 A path inside the archive with its root at the archive's root. 

161 show_dirs 

162 Returns directories. 

163 show_files 

164 Returns files. 

165 """ 

166 

167 archive_path, arch_inner_dir = path_and_subdir(archive_path) 

168 inner_dir = Path(inner_dir or arch_inner_dir or Path(".")) 

169 

170 results = [] 

171 # Read the archive info and iterate over the paths. Return the ones we want. 

172 if ".tar" in archive_path.suffixes: 

173 with tarfile.open(archive_path) as arch: 

174 for info in arch.getmembers(): 

175 path = Path(info.name) 

176 if path.parent != inner_dir: 

177 continue 

178 if info.isdir() and show_dirs: 

179 results.append(Path("/") / path) 

180 if info.isfile() and show_files: 

181 results.append(Path("/") / path) 

182 elif archive_path.suffix == ".zip": 

183 with zipfile.ZipFile(archive_path) as arch: 

184 for zip_info in arch.infolist(): 

185 zip_path = zipfile.Path(archive_path, zip_info.filename) 

186 if Path(zip_info.filename).parent != inner_dir: 

187 continue 

188 if zip_path.is_dir() and show_dirs: 

189 results.append(Path("/") / zip_info.filename) 

190 if not zip_path.is_dir() and show_files: 

191 results.append(Path("/") / zip_info.filename) 

192 elif archive_path.suffix == ".bz2": 

193 if inner_dir != Path("."): 

194 raise ValueError( 

195 ".bz2 files don't have an inner structure (tried to access " 

196 f"'{archive_path}:{inner_dir}')." 

197 ) 

198 results.extend([Path(archive_path.stem)] if show_files else []) 

199 else: 

200 raise ValueError( 

201 f"Unsupported archive extension '{''.join(archive_path.suffixes)}'." 

202 ) 

203 return sorted(results) # Fixes inconsistent file ordering across platforms