Source code for general_tools.file_utils

from __future__ import unicode_literals, print_function
import codecs
import json
import os
import zipfile
import sys
import shutil
import yaml
from mimetypes import MimeTypes
from libraries.general_tools.data_utils import json_serial

# we need this to check for string versus object
PY3 = sys.version_info[0] == 3

if PY3:
    string_types = str,
else:
    # noinspection PyCompatibility
    string_types = basestring,


[docs]def unzip(source_file, destination_dir): """ Unzips <source_file> into <destination_dir>. :param str|unicode source_file: The name of the file to read :param str|unicode destination_dir: The name of the directory to write the unzipped files """ with zipfile.ZipFile(source_file) as zf: zf.extractall(destination_dir)
[docs]def add_contents_to_zip(zip_file, path, include_root=False): """ Zip the contents of <path> into <zip_file>. :param str|unicode zip_file: The file name of the zip file :param str|unicode path: Full path of the directory to zip up :param bool include_root: If true, the zip file will start with the directory of the path parameter """ path = path.rstrip(os.path.sep) if include_root: path_start_index = len(os.path.dirname(path))+1 else: path_start_index = len(path)+1 with zipfile.ZipFile(zip_file, 'a') as zf: for root, dirs, files in os.walk(path): for f in files: file_path = os.path.join(root, f) zf.write(file_path, file_path[path_start_index:])
[docs]def add_file_to_zip(zip_file, file_name, arc_name=None, compress_type=None): """ Zip <file_name> into <zip_file> as <arc_name>. :param str|unicode zip_file: The file name of the zip file :param str|unicode file_name: The name of the file to add, including the path :param str|unicode arc_name: The new name, with directories, of the file, the same as filename if not given :param str|unicode compress_type: """ with zipfile.ZipFile(zip_file, 'a') as zf: zf.write(file_name, arc_name, compress_type)
[docs]def make_dir(dir_name, linux_mode=0o755, error_if_not_writable=False): """ Creates a directory, if it doesn't exist already. If the directory does exist, and <error_if_not_writable> is True, the directory will be checked for writability. :param str|unicode dir_name: The name of the directory to create :param int linux_mode: The mode/permissions to set for the new directory expressed as an octal integer (ex. 0o755) :param bool error_if_not_writable: The name of the file to read """ if not os.path.exists(dir_name): os.makedirs(dir_name, linux_mode) elif error_if_not_writable: if not os.access(dir_name, os.R_OK | os.W_OK | os.X_OK): raise IOError('Directory {0} is not writable.'.format(dir_name))
[docs]def load_json_object(file_name, default=None): """ Deserialized JSON file <file_name> into a Python dict. :param str|unicode file_name: The name of the file to read :param default: The value to return if the file is not found """ if not os.path.isfile(file_name): return default # return a deserialized object return json.loads(read_file(file_name))
[docs]def load_yaml_object(file_name, default=None): """ Deserialized YAML file <file_name> into a Python dict. :param str|unicode file_name: The name of the file to read :param default: The value to return if the file is not found """ if not os.path.isfile(file_name): return default # return a deserialized object return yaml.load(read_file(file_name))
[docs]def read_file(file_name, encoding='utf-8-sig'): with codecs.open(file_name, 'r', encoding=encoding) as f: content = f.read() # convert Windows line endings to Linux line endings content = content.replace('\r\n', '\n') return content
[docs]def write_file(file_name, file_contents, indent=None): """ Writes the <file_contents> to <file_name>. If <file_contents> is not a string, it is serialized as JSON. :param str|unicode file_name: The name of the file to write :param str|unicode|object file_contents: The string to write or the object to serialize :param int indent: Specify a value if you want the output formatted to be more easily readable """ # make sure the directory exists make_dir(os.path.dirname(file_name)) if isinstance(file_contents, string_types): text_to_write = file_contents else: if os.path.splitext(file_name)[1] == '.yaml': text_to_write = yaml.safe_dump(file_contents) else: text_to_write = json.dumps(file_contents, sort_keys=True, indent=indent, default=json_serial) with codecs.open(file_name, 'w', encoding='utf-8') as out_file: out_file.write(text_to_write)
[docs]def get_mime_type(path): mime = MimeTypes() mime_type = mime.guess_type(path)[0] if not mime_type: mime_type = "text/{0}".format(os.path.splitext(path)[1]) return mime_type
[docs]def get_files(directory, relative_paths=False, include_directories=False, topdown=False, extensions=None, exclude=None): file_list = [] for root, dirs, files in os.walk(directory, topdown=topdown): if exclude and (os.path.basename(root) in exclude or os.path.basename(root).lower() in exclude): continue if relative_paths: path = root[len(directory)+1:] else: path = root for filename in files: if (not exclude or (filename not in exclude and filename.lower() not in exclude)) and \ (not extensions or os.path.splitext(filename)[1] in extensions or os.path.splitext(filename)[1].lower() in extensions): file_list.append(os.path.join(path, filename)) if include_directories: for dir_name in dirs: file_list.append(os.path.join(path, dir_name)) return file_list
[docs]def get_subdirs(dir, relative_paths=False, topdown=False): dir_list = [] for root, dirs, files in os.walk(dir, topdown=topdown): if relative_paths: path = os.path.relpath(root, dir) else: path = root for dirname in dirs: dir_list.append(os.path.join(path, dirname)) return dir_list
[docs]def copy_tree(src, dst, symlinks=False, ignore=None): """ Recursively copy a directory and all subdirectories. Parameters same as shutil.copytree :param src: :param dst: :param symlinks: :param ignore: :return: """ if not os.path.exists(dst): os.makedirs(dst) for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): copy_tree(s, d, symlinks, ignore) else: # only replace file if modified if not os.path.exists(d) or os.stat(s).st_mtime - os.stat(d).st_mtime > 1: shutil.copy2(s, d)
[docs]def remove_tree(dir_path, ignore_errors=True): if os.path.isdir(dir_path): shutil.rmtree(dir_path, ignore_errors=ignore_errors)
[docs]def remove(file_path, ignore_errors=True): if ignore_errors: try: os.remove(file_path) except OSError: pass else: os.remove(file_path)