Coverage for src/bob/pipelines/dataset/protocols/archive.py: 43%
84 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-06-16 14:21 +0200
« prev ^ index » next coverage.py v7.0.5, created at 2023-06-16 14:21 +0200
1"""Archives (tar, zip) operations like searching for files and extracting."""
3import bz2
4import io
5import logging
6import os
7import tarfile
8import zipfile
10from fnmatch import fnmatch
11from pathlib import Path
12from typing import IO, TextIO, Union
14logger = logging.getLogger(__name__)
17def path_and_subdir(
18 archive_path: Union[str, os.PathLike],
19) -> tuple[Path, Union[Path, None]]:
20 """Splits an archive's path from a sub directory (separated by ``:``)."""
21 archive_path_str = Path(archive_path).as_posix()
22 if ":" in archive_path_str:
23 archive, sub_dir = archive_path_str.rsplit(":", 1)
24 return Path(archive), Path(sub_dir)
25 return Path(archive_path), None
28def _is_bz2(path: Union[str, os.PathLike]) -> bool:
29 try:
30 with bz2.BZ2File(path) as f:
31 f.read(1024)
32 return True
33 except (OSError, EOFError):
34 return False
37def is_archive(path: Union[str, os.PathLike]) -> bool:
38 """Returns whether the path points in an archive.
40 Any path pointing to a valid tar or zip archive or to a valid bz2
41 file will return ``True``.
42 """
43 archive = path_and_subdir(path)[0]
44 try:
45 return any(
46 tester(path_and_subdir(archive)[0])
47 for tester in (tarfile.is_tarfile, zipfile.is_zipfile, _is_bz2)
48 )
49 except (FileNotFoundError, IsADirectoryError):
50 return False
53def search_and_open(
54 search_pattern: str,
55 archive_path: Union[str, os.PathLike],
56 inner_dir: Union[os.PathLike, None] = None,
57 open_as_binary: bool = False,
58) -> Union[IO[bytes], TextIO, None]:
59 """Returns a read-only stream of a file matching a pattern in an archive.
61 Wildcards (``*``, ``?``, and ``**``) are supported (using
62 :meth:`pathlib.Path.glob`).
64 The first matching file will be open and returned.
66 examples:
68 .. code-block: text
70 archive.tar.gz
71 + subdir1
72 | + file1.txt
73 | + file2.txt
74 |
75 + subdir2
76 + file1.txt
78 ``search_and_open("archive.tar.gz", "file1.txt")``
79 opens``archive.tar.gz/subdir1/file1.txt``
81 ``search_and_open("archive.tar.gz:subdir2", "file1.txt")``
82 opens ``archive.tar.gz/subdir2/file1.txt``
84 ``search_and_open("archive.tar.gz", "*.txt")``
85 opens ``archive.tar.gz/subdir1/file1.txt``
88 Parameters
89 ----------
90 archive_path
91 The ``.tar.gz`` archive file containing the wanted file. To match
92 ``search_pattern`` in a sub path in that archive, append the sub path
93 to ``archive_path`` with a ``:`` (e.g.
94 ``/path/to/archive.tar.gz:sub/dir/``).
95 search_pattern
96 A string to match to the file. Wildcards are supported (Unix pattern
97 matching).
99 Returns
100 -------
101 io.TextIOBase or io.BytesIO
102 A read-only file stream.
103 """
105 archive_path = Path(archive_path)
107 if inner_dir is None:
108 archive_path, inner_dir = path_and_subdir(archive_path)
110 if inner_dir is not None:
111 pattern = (Path("/") / inner_dir / search_pattern).as_posix()
112 else:
113 pattern = (Path("/") / search_pattern).as_posix()
115 if ".tar" in archive_path.suffixes:
116 tar_arch = tarfile.open(archive_path) # TODO File not closed
117 for member in tar_arch:
118 if member.isfile() and fnmatch("/" + member.name, pattern):
119 break
120 else:
121 logger.debug(
122 f"No file matching '{pattern}' were found in '{archive_path}'."
123 )
124 return None
126 if open_as_binary:
127 return tar_arch.extractfile(member)
128 return io.TextIOWrapper(tar_arch.extractfile(member), encoding="utf-8")
130 elif archive_path.suffix == ".zip":
131 zip_arch = zipfile.ZipFile(archive_path)
132 for name in zip_arch.namelist():
133 if fnmatch("/" + name, pattern):
134 break
135 else:
136 logger.debug(
137 f"No file matching '{pattern}' were found in '{archive_path}'."
138 )
139 return zip_arch.open(name)
141 raise ValueError(
142 f"Unknown file extension '{''.join(archive_path.suffixes)}'"
143 )
146def list_dirs(
147 archive_path: Union[str, os.PathLike],
148 inner_dir: Union[os.PathLike, None] = None,
149 show_dirs: bool = True,
150 show_files: bool = True,
151) -> list[Path]:
152 """Returns a list of all the elements in an archive or inner directory.
154 Parameters
155 ----------
156 archive_path
157 A path to an archive, or an inner directory of an archive (appended
158 with a ``:``).
159 inner_dir
160 A path inside the archive with its root at the archive's root.
161 show_dirs
162 Returns directories.
163 show_files
164 Returns files.
165 """
167 archive_path, arch_inner_dir = path_and_subdir(archive_path)
168 inner_dir = Path(inner_dir or arch_inner_dir or Path("."))
170 results = []
171 # Read the archive info and iterate over the paths. Return the ones we want.
172 if ".tar" in archive_path.suffixes:
173 with tarfile.open(archive_path) as arch:
174 for info in arch.getmembers():
175 path = Path(info.name)
176 if path.parent != inner_dir:
177 continue
178 if info.isdir() and show_dirs:
179 results.append(Path("/") / path)
180 if info.isfile() and show_files:
181 results.append(Path("/") / path)
182 elif archive_path.suffix == ".zip":
183 with zipfile.ZipFile(archive_path) as arch:
184 for zip_info in arch.infolist():
185 zip_path = zipfile.Path(archive_path, zip_info.filename)
186 if Path(zip_info.filename).parent != inner_dir:
187 continue
188 if zip_path.is_dir() and show_dirs:
189 results.append(Path("/") / zip_info.filename)
190 if not zip_path.is_dir() and show_files:
191 results.append(Path("/") / zip_info.filename)
192 elif archive_path.suffix == ".bz2":
193 if inner_dir != Path("."):
194 raise ValueError(
195 ".bz2 files don't have an inner structure (tried to access "
196 f"'{archive_path}:{inner_dir}')."
197 )
198 results.extend([Path(archive_path.stem)] if show_files else [])
199 else:
200 raise ValueError(
201 f"Unsupported archive extension '{''.join(archive_path.suffixes)}'."
202 )
203 return sorted(results) # Fixes inconsistent file ordering across platforms