# Copyright (c) 2023 CNES
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
File system tools
=================
"""
from __future__ import annotations
from typing import Any, Iterator, Sequence
import os
import fsspec
#: Path separator
SEPARATOR = '/'
[docs]
def join_path(*args: str) -> str:
"""Join path elements."""
return SEPARATOR.join(args)
[docs]
def normalize_path(fs: fsspec.AbstractFileSystem, path: str) -> str:
"""Normalize the path.
Args:
fs: file system object
path: path to test
Returns:
Normalized path.
"""
# pylint: disable=protected-access
# There is no public method to perform this operation.
path = fs._strip_protocol(path) # type: ignore[return-value]
# pylint: enable=protected-access
if path == '':
path = fs.sep
if fs.protocol in ('file', 'memory'):
return os.path.normpath(path)
return path
[docs]
def get_fs(
filesystem: fsspec.AbstractFileSystem | str | None = None
) -> fsspec.AbstractFileSystem:
"""Return the file system object from the input.
Args:
filesystem: file system object or file system name
Returns:
File system object.
Example:
>>> from fsspec.implementations.local import LocalFileSystem
>>> get_fs("hdfs")
>>> get_fs(LocalFileSystem("/tmp/swot"))
"""
filesystem = filesystem or 'file'
return (fsspec.filesystem(filesystem)
if isinstance(filesystem, str) else filesystem)
[docs]
def fs_walk(
fs: fsspec.AbstractFileSystem,
path: str,
sort: bool = False,
) -> Iterator[tuple[str, list[str], list[str]]]:
"""Return the list of files and directories in a directory.
Args:
fs: file system object
path: path to the directory
sort: if True, the list of files and directories is sorted
alphabetically
Returns:
Iterator of (path, directories, files).
"""
dirs: list[str]
files: list[str]
dirs, files = [], []
try:
listing: list[dict[str, Any]] = fs.ls(path, detail=True)
except (FileNotFoundError, OSError):
yield '', [], []
return
for info in listing:
# each info name must be at least [path]/part , but here
# we check also for names like [path]/part/
pathname: str = info['name'].rstrip(SEPARATOR)
name: str = pathname.rsplit(SEPARATOR, 1)[-1]
if info['type'] == 'directory' and pathname != path:
# do not include "self" path
dirs.append(pathname)
else:
files.append(name)
def sort_sequence(sequence: list[str]) -> list[str]:
"""Sort the sequence if the user wishes."""
return list(sorted(sequence)) if sort else sequence
dirs = sort_sequence(dirs)
yield path.rstrip(SEPARATOR), dirs, sort_sequence(files)
for item in dirs:
yield from fs_walk(fs, item, sort=sort)
[docs]
def copy_file(
source: str,
target: str,
fs_source: fsspec.AbstractFileSystem,
fs_target: fsspec.AbstractFileSystem,
) -> None:
"""Copy a file from one location to another.
Args:
source: The name of the source file.
target: The name of the target file.
fs_source: The file system that the source file is stored on.
fs_target: The file system that the target file is stored on.
"""
with fs_source.open(source, 'rb') as source_stream:
with fs_target.open(target, 'wb') as target_stream:
target_stream.write(source_stream.read()) # type: ignore[arg-type]
[docs]
def copy_files(
source: Sequence[str],
target: str,
fs_source: fsspec.AbstractFileSystem,
fs_target: fsspec.AbstractFileSystem,
) -> None:
"""Copy a list of files from one location to another.
Args:
source: The names of the source files.
target: The name of the target directory.
fs_source: The file system that the source files are stored on.
fs_target: The file system that the target directory is stored on.
"""
tuple(
map(
lambda path: copy_file(path,
join_path(target, os.path.basename(path)),
fs_source, fs_target), source))
[docs]
def copy_tree(
source: str,
target: str,
fs_source: fsspec.AbstractFileSystem,
fs_target: fsspec.AbstractFileSystem,
) -> None:
"""Copy a directory tree from one location to another.
Args:
source: The name of the source directory.
target: The name of the target directory.
fs_source: The file system that the source directory is stored on.
fs_target: The file system that the target directory is stored on.
Raises:
ValueError: If the target already exists.
"""
if fs_target.exists(target):
raise ValueError(f'Target {target} already exists')
fs_target.mkdir(target)
for root, dirs, files in tuple(fs_walk(fs_source, source)):
for name in files:
source_path: str = join_path(root, name)
copy_file(source_path,
join_path(target, os.path.relpath(source_path, source)),
fs_source, fs_target)
for source_path in dirs:
fs_target.mkdir(
join_path(target, os.path.relpath(source_path, source)))