Source code for flame.utils

import os, logging, subprocess, platform, yaml, re
from logging import Logger
from typing import Union, Any
from types import NoneType

import numpy as np
from numpy.typing import NDArray
from natsort import natsorted

from .error import FLAMEDtypeError, CAREDatasetError, FLAMEMLFlowError, FLAMEPyMatlabError

LOGGER = logging.getLogger("UTIL")

def _int_or_int_array(
        data: Any, 
        logger: Union[Logger, NoneType]=None,
        dtype: Union[int, np.integer]=int, # type: ignore
        accept_nonetype: bool=False,
    ) -> Union[NDArray[np.integer], np.integer, int]:
    """
    If data can be turned into an integer array, it will.
    If the data can't but matches dtype int, it will return a single integer.
    """
    if data is None and accept_nonetype:
        return data
    
    try:
        proc_data = dtype(data) # type: ignore
    except TypeError as e:
        try:
            proc_data = np.array(data, dtype=dtype) # type: ignore
        except ValueError as e:
            if logger is not None:
                logger.exception(f"Could not convert {data} to int or int array.\nERROR: {e}")
            raise FLAMEDtypeError(f"Could not convert {data} to int or int array.\nERROR: {e}")
    return proc_data


def _validate_int(
        data: Any, 
        logger: Union[Logger, NoneType]=None,
        dtype: Union[int, np.integer]=int, # type: ignore
        accept_nonetype: bool=False,
        accept_float: bool=False,
    ) -> Union[np.integer, int]:
    """
    Will ensure input data is an array.
    Will return None if input data is none and accept_nonetype is True.
    """
    if data is None and accept_nonetype:
        return data
    
    try:
        proc_data = dtype(data) # type: ignore
        if not accept_float:
            assert not np.issubdtype(type(data), np.floating), f"Data {data} is float, but accept_float is False"
    except TypeError as e:
        if logger is not None:
            logger.exception(f"Could not convert {data} to integer.\nERROR: {e}")
        raise FLAMEDtypeError(f"Could not convert {data} to integer.\nERROR: {e}")
    return proc_data


def _validate_int_greater_than_zero(
        data: Any, 
        logger: Union[Logger, NoneType]=None,
        dtype: Union[int, np.integer]=int, # type: ignore
        accept_nonetype: bool=False,
        accept_float: bool=False,
    ) -> Union[np.integer, int]:
    try:
        data = _validate_int(
            data=data, 
            logger=logger, 
            dtype=dtype, 
            accept_nonetype=accept_nonetype,
            accept_float = accept_float
        )
    except FLAMEDtypeError as e:
        if logger is not None:
            logger.exception(f"Data {data} was not an integer or could not be cast as an integer.")
        raise FLAMEDtypeError(f"Data {data} was not an integer or could not be cast as an integer.")
    
    try:
        data = _validate_is_greater_than_zero(
            data=data,
            logger=logger,
            accept_nonetype=accept_nonetype
        )
    except FLAMEDtypeError as e:
        if logger is not None:
            logger.exception(f"Data {data} was an integer, but was not greater than 0")
        raise FLAMEDtypeError(f"Data {data} was an integer, but was not positive")

    return data
        

def _float_or_float_array(
        data: Any, 
        logger: Union[Logger, NoneType]=None, 
        dtype: Union[float, np.floating]=float, # type: ignore
        accept_nonetype: bool=False
    ) -> Union[NDArray[np.floating], np.floating, float]:
    """
    If data can be turned into a floating point array, it will.
    If the data can't but matches dtype float, it will return a single integer.
    """
    if data is None and accept_nonetype:
        return data
    
    try:
        proc_data = dtype(data) # type: ignore
    except TypeError as e:
        try:
            proc_data = np.array(data, dtype=dtype) # type: ignore
        except ValueError as e:
            if logger is not None:
                logger.exception(f"Could not convert {data} to float or float array.\nERROR: {e}")
            raise FLAMEDtypeError(f"Could not convert {data} to float or float array.\nERROR: {e}")
    return proc_data


def _validate_is_greater_than_zero(
        data: Union[np.integer, np.floating, NoneType], 
        logger: Union[Logger, NoneType]=None,
        accept_nonetype: bool=True
    ) -> Union[np.integer, np.floating, NoneType]:
    """
    If a number is passed to 'data', this will raise an error if the number is below 1.
    """
    try:
        if not accept_nonetype:
            assert data is not None, f"Data {data} is NoneType, but 'accept_nonetype' is False"
        
        # return None if data is none, otherwise errors ensue
        if data is None:
            return None
        
        assert data > 0, f"Data {data} was not greater than 0"
    except AssertionError as e:
        if logger is not None:
            logger.exception(f"Could not validate {data} is greater than zero.\nERROR: {e}")
        raise FLAMEDtypeError(f"Could not validate {data} is greater than zero.\nERROR: {e}")
    return data


[docs] def min_max_norm( arr: NDArray, mini: Union[NDArray, list, int, float], maxi: Union[NDArray, list, int, float], sigma: float=1e-20, dtype: Union[np.floating[Any], np.integer[Any]]=np.float32 # type: ignore ) -> NDArray[Union[np.floating, np.integer]]: """ Min-Max normalized given array based on provided 'mini' and 'maxi' If mini and maxi are arrays """ if (isinstance(mini, Union[np.ndarray, list]) or isinstance(maxi, Union[np.ndarray, list])): return _min_max_norm_array( arr=arr, mini=np.array(mini), maxi=np.array(maxi), sigma=sigma, dtype=dtype ) return (arr - mini) / (maxi - mini + sigma)
def _min_max_norm_array( arr: NDArray, mini: NDArray[Union[np.floating, np.integer]], maxi: NDArray[Union[np.floating, np.integer]], sigma: float=1e-20, dtype: Union[np.floating, np.integer]=np.float32 # type: ignore ) -> NDArray[Union[np.floating, np.integer]]: """ Min-max normalizing based on 'mini' and 'maxi' arrays. If 'mini' and 'maxi' are arrays, they must be 1 dimensional, and of equal size. The dimension of 'mini' and 'maxi' must match a dimension in the array 'arr'. """ assert mini.ndim == 1 and maxi.ndim == 1 assert len(mini) == len(maxi) assert len(mini) in arr.shape axis = list(arr.shape).index(len(mini)) transpose_arr = [] for i in range(arr.ndim): if i == axis: continue transpose_arr.append(i) transpose_arr.append(axis) arr = arr.transpose(tuple(transpose_arr)) arr = ((arr - mini) / (maxi - mini + sigma)).astype(dtype) new_transpose_arr = [] for i in range(arr.ndim - 1): if i == axis: new_transpose_arr.append(arr.ndim - 1) new_transpose_arr.append(i) # for the case where the matching dimension is at the end (such as 'ZYXC') if len(new_transpose_arr) != arr.ndim: new_transpose_arr.append(arr.ndim - 1) return arr.transpose(tuple(new_transpose_arr)) def _apply_bidirectional_correction(img: NDArray, corr: Union[np.integer, int]): if corr < 0: # shift leftwards img[...,::2,np.abs(corr):] = img[...,::2,:corr] img = img[...,np.abs(corr):] elif corr > 0: # shift rightwards img[...,::2,:-1*corr] = img[...,::2,corr:] img = img[...,:-1*corr] # crop image else: # case where correction is equal to 0; don't to anything. pass return img
[docs] def is_iterable(obj): try: iter(obj) return True except: return False
def _compress_dict_fields(data: dict) -> dict: """ Description: Convert a many-dimensional dictionary into a single-dimensional dictionary. Example: my_dict = { 'A': { '1': 'foo', '2': { 'p': 'bar', 'q': 'lorem' } }, 'B': 'ipsum' } _compress_dict_fields(my_dict) -> { 'A-1': 'foo', 'A-2-p': 'bar', 'A-2-q': 'lorem', 'B': 'ipsum' } """ new_data = {} for k, v in data.items(): assert '-' not in k, f"Compression mechanism relies on '-' as separator. Remove '-' from {k} and try again!" if isinstance(v, dict): sub_data = _compress_dict_fields(v) for sub_k, sub_v in sub_data.items(): new_data[f"{k}-{sub_k}"] = sub_v else: new_data[k] = v return new_data def _expand_dict_fields(data: dict) -> dict: """ Description: Reverse of _compress_dict_fields(). """ new_data = {} for k, v in data.items(): split = k.split('-') for s in split[::-1]: pass return new_data
[docs] def set_up_tracking_server(ip: str, port: str, direc: str, log_path: str) -> subprocess.Popen: """ Set up tracking server by spawning up parallel process. Args: - ip (str): IP where to host the MLFlow server (reocmmend 127.0.0.1 a.k.a. localhost) - port (str): Port at IP where to host the MLFlow server - direc (str): The directory where mlflow run data & associated artifacts are stored. Typically 'mlruns' - log_path (str): Path to directory where mlflow server logs will be stored. Returns: - proc (subprocess.Popen): Process where the server is being hosted """ server_command = [ "mlflow", "server", "--host", ip, "--port", port, "--serve-artifacts", "--backend-store-uri", f"file:{os.path.sep*3}{direc}", "--default-artifact-root", f"file:{os.path.sep*3}{direc}", "--artifacts-destination", f"file:{os.path.sep*3}{direc}" ] update_yaml_artifact_path(direc) LOGGER.info(f"Starting MLFLOW server with command:\n`{' '.join(server_command)}`") MLFLOW_SERVER_LOG = open(log_path, "w+") LOGGER.info(f"Starting MLFLOW server log at path {MLFLOW_SERVER_LOG.name}") try: proc = subprocess.Popen( server_command, stdout=MLFLOW_SERVER_LOG, stderr=MLFLOW_SERVER_LOG ) except Exception as e: LOGGER.error(f"Problem starting MLFlow Server process.\n{e.__class__.__name__}: {e}") raise FLAMEMLFlowError(f"Problem starting MLFlow Server process.\n{e.__class__.__name__}: {e}") return proc
[docs] def change_root(root: str, to_change: str) -> str: try: root_direc_split, to_change_split = root.split(os.path.sep), re.split("/|\\\\", to_change) root_direc_last = root_direc_split[-1] to_change_last_index = to_change_split.index(root_direc_last) new = os.path.sep.join(root_direc_split + to_change_split[to_change_last_index+1:]) except ValueError as e: try: assert os.path.isdir(root) return root except: print(root, to_change) LOGGER.error(f"ROOT: {root}, TO_CHANGE: {to_change}") raise return new
[docs] def update_yaml_artifact_path(mlrun_direc: str) -> None: """ Will search through the provided directory and update the 'artifact_uri' within the yaml files it contains. This is to fix issues with MLFLOW where absolute paths are used for all artifact URIs. """ assert os.path.isdir(mlrun_direc), f"Input path {mlrun_direc} must be a directory." for root, dirs, files in os.walk(mlrun_direc): for f in files: if os.path.splitext(f)[1] in [".yaml", ".yml"]: this_path = os.path.join(root, f) yml = yaml.safe_load(open(this_path, 'r')) if 'artifact_uri' in yml.keys(): artifact_path = yml['artifact_uri'] yml['artifact_uri'] = f"file:{os.path.sep*3}{change_root(root, artifact_path)}" elif 'artifact_location' in yml.keys(): artifact_path = yml['artifact_location'] yml['artifact_location'] = change_root(root, artifact_path) else: continue yaml.dump(yml, open(this_path, "w"))
[docs] def get_windows_user_path() -> str: assert on_wsl(), f"Must be on WSL to get Windows user path" proc = subprocess.Popen( ["wslvar", "USERPROFILE"], stdout=subprocess.PIPE ) windows_path = proc.stdout.read().decode("UTF-8").strip() # type: ignore proc2 = subprocess.Popen( ["wslpath", windows_path], stdout=subprocess.PIPE ) return proc2.stdout.read().decode("UTF-8").strip() # type: ignore
[docs] def on_wsl(version: str=platform.uname().release) -> bool: if version.endswith("WSL2"): return True return False
[docs] def update_matlab_variables(matlab_eng: str, variable_dict: dict, skip_missing: bool=False) -> None: """ Sync all of the variables in the provided variable dictionaries Args: - variable_dict (dict): Dictionary of variables to update - skip_missing (bool): Whether to skip variables that are not found in MATLAB engine. DEFAULT: False. Returns: None. Updates the variables in-place """ for key in variable_dict.keys(): try: variable_dict[key] = matlab_eng.workspace[key] # type: ignore except Exception as e: if skip_missing: LOGGER.warning(f"Could not find {key} in {matlab_eng}. 'skip_missing' is True, so continuing...") else: LOGGER.exception(f"Could not find {key} in {matlab_eng}. 'skip_missing' is False, so raising...\n{e.__class__.__name__}: {e}") raise FLAMEPyMatlabError(f"Could not find {key} in {matlab_eng}. 'skip_missing' is False, so raising...\n{e.__class__.__name__}: {e}") return