U
     xbOI                     @   sr   d dl Z d dlZd dlZd dlZd dlZddddgZdddd	d
gZdddZdddZ	dddZ
G dd dZdS )    NlsfpbsslurmlocalZOMP_NUM_THREADSZOPENBLAS_NUM_THREADSZMKL_NUM_THREADSZNUMEXPR_NUM_THREADSZVECLIB_MAXIMUM_THREADSxFc                 C   s,  |  }|dkr| gS | \}}}}|| ||  }}	|dkrB|}
n|	}
tt|
| }t|d}tt|
| }g }t|D ]~}|dkr|||  }|||d   }t||}|||||g q|||  }|||d   }t||}|||||g q|r(td	||
| td	|| |S )aS  Divide the input box into `num_split` different sub_boxes.

    :param box: [x0, y0, x1, y1]: list[int] of size 4
    :param num_split: int, the number of sub_boxes to split a box into
    :param dimension: str = 'y' or 'x', the dimension along which to split the boxes
    :return: sub_boxes: list(list(4 int)), the splited sub boxes
       y
   z/split along {} dimension ({:d}) into {:d} boxesz,    with each box up to {:d} in {} dimension)
lowerintnpceilmaxrangeminappendprintformat)box	num_split	dimension	print_msgx0y0x1y1lengthwidthdim_sizestep	sub_boxesir0r1c0c1 r&   M/home/exouser/operations/rsmas_insar/sources/MintPy/mintpy/objects/cluster.pysplit_box2sub_boxes   s4    	


r(   Tc                 C   sj   |rt dt i }tD ]}tj|d||< q| rft| } tD ]"}| tj|< |rBt d||  qB|S )a  limit/set the number of threads for all environmental variables to the given value
    and save/return the original value for backup purpose.
    Link: https://stackoverflow.com/questions/30791550

    Parameters: num_threads      - str, number of threads
                                   Set to None to return without changing env variables
    Returns:    num_threads_dict - dict, dictionary of the original number of threads
    z save the original settings of {}Nset {} = {})r   r   NUM_THREADS_ENV_LISTosenvirongetstr)num_threadsr   num_threads_dictkeyr&   r&   r'   set_num_threadsO   s    
r2   c                 C   s|   |rt dt |  D ]\\}}|tj kr|dkrXtj| |rvt d| q|tj|< |rt d|| qdS )z?Set back the number of threads for all environmental variables.z(roll back to the original settings of {}Nzremove env variable {}r)   )r   r   r*   itemsr+   r,   keyspop)r0   r   r1   valuer&   r&   r'   roll_back_num_threadsk   s    
r7   c                   @   s^   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	e
dd Zdd Zdd ZdS )DaskClustera  
    Generic dask cluster wrapper for parallel processing in blocks.

    This object takes in a computing function for one block in space.
    For the computing function:
        1. the output is always several matrices (or None) and one box.
        2. the number of matrices may vary for different applications/functions.
        3. all matrices will be in 2D in size of (len, wid) or 3D in size of (n, len, wid),
           thus, the last two dimension (in space) will be the same.
    This charateristics allows the automatic result collection without prior knowledge
        of the computing funciton, thus being a generic wrapper.

    Check ifgram_inversion.py as an example.

    Nc                 K   s|   |  | _|| _|| _|| _| | j| j| _|   | j| jd< td| j | jdk	rltd| j d| _	d| _
dS )ai  Initiate object
        :param cluster_type: str, cluster to use (local, slurm, lsf, pbs)
        :param num_worker: str, number of workers to use
        :param config_name: str, the name of configuratino section
        :other param **kwargs: dask configuration parameters
                 e.g. config_name: str, the user specified config name to use
        config_namezinput Dask cluster type: {}Nzinput Dask config name: {})r
   cluster_type
num_workerr9   cluster_kwargsformat_num_workerformat_config_namer   r   clusterclient)selfr:   r;   r9   kwargsr&   r&   r'   __init__   s    	

zDaskCluster.__init__c              	   C   s   t d | jdkr(ddlm} | | _nddl}| jdkrL|jf | j| _nZ| jdkrh|jf | j| _n>| jdkr|j	f | j| _n"d	
| j}|d

t7 }t|t d| j  d}|rtdd}|| j d  W 5 Q R X dS )zInitiate the clusterzinitiate Dask clusterr   r   )LocalClusterNr   r   r   zun-recognized input cluster: {}z
supported clusters: {}
Fz dask_command_run_from_python.txtw)r   r:   dask.distributedrD   r?   dask_jobqueueZ
LSFClusterr<   Z
PBSClusterZSLURMClusterr   CLUSTER_LIST
ValueErrorZ
job_scriptopenwrite)rA   rD   rH   msg
debug_modefr&   r&   r'   rK      s&    




zDaskCluster.openc           	      C   s   ddl m} |d }t|| jddd}t|| _td| j td| j | j| j td	 || j| _	| j	j
d
d | |||\}}| ||||}|S )a  Wrapper function encapsulating submit_workers and compile_workers.

        For a generic result collection without prior knowledge of the computing function,
        we assume that the output of "func" is: several 2D or 3D matrices + a box

        :param func: function, a python function to run in parallel
        :param func_data: dict, a dictionary of the argument to pass to the function
        :param results: list[numpy.ndarray], arrays of the appropriate structure representing
               the final output of processed box (need to be in the same order as the function passed in
               submit_workers returns in)
        :return: results: tuple(numpy.ndarray), the processed results of the box
        r   )Clientr   r   F)r   r   r   zCsplit patch into {} sub boxes in x direction for workers to processz scale Dask cluster to {} workerszinitiate Dask clientT)check)rG   rP   r(   r;   lenr   r   r?   scaler@   get_versions
submit_jobcollect_result)	rA   func	func_dataresultsrP   r   r    futuressubmission_timer&   r&   r'   run   s    
zDaskCluster.runc           	      C   s`   t   }g }t|D ]B\}}td|| ||d< | jj|f|ddi}|| q||fS )a  Submit dask workers to the networking client that run the specified function (func)
        on the specified data (func_data). Each dask worker is in charge of a small subbox of the main box.

        :param func: function, a python function to run in parallel
        :param func_data: dict, a dictionary of the argument to pass to the function
        :param sub_boxes: list(np.nd.array), list of boxes to be computed in parallel

        :return futures: list(dask.Future), list of futures representing future dask worker calculations
        :return submission_time: time, the time of submission of the dask workers (used to determine worker
                runtimes as a performance diagnostic)
        z-submit a job to the worker for sub box {}: {}r   retries   )time	enumerater   r   r@   submitr   )	rA   rW   rX   r    r[   rZ   r!   sub_boxfuturer&   r&   r'   rU      s    zDaskCluster.submit_jobc              	   C   sZ  ddl m} d}||ddD ]6\}}|d7 }t | }	td| d|	dd	 |d
 }
|
\}}}}||d 8 }||d 8 }||d 8 }||d 8 }t|dd
 D ]\}}|dk	r|j}|dkr||| dddd||||f< q|dkr||| dd||||f< q|dkr8||| ||||f< qd|}|d7 }t|qq|S )a$  Compile results from completed workers and recompiles their sub outputs into the output
        for the complete box being worked on.
        :param futures: list(dask.Future), list of futures representing future dask worker calculations
        :param results: list[numpy.ndarray], arrays of the appropriate structure representing
               the final output of processed box (need to be in the same order as the function passed in
               submit_workers returns in)
        :param box: numpy.ndarray, the initial complete box being processed
        :param submission_time: time, the time of submission of the dask workers (used to determine worker
               runtimes as a performance diagnostic)
        :return: results: tuple(numpy.ndarray), the processed results of the box
        r   )as_completedT)with_resultsr   z	
FUTURE #z complete. Time used: z.0fz secondsN   r^      z*worker result has unexpected dimension: {}z!
it should be either 2 or 3 or 4!)rG   rd   r_   r   r`   ndimr   	Exception)rA   rZ   rY   r   r[   rd   Z
num_futurerc   Zsub_resultsZsub_trb   r   r   r   r   r!   
sub_resultZnum_dimrM   r&   r&   r'   rV     sN      
  

zDaskCluster.collect_resultc                 C   s0   | j   td | j  td |   dS )zPClose connections to dask client and cluster and moves dask output/error files. zclose dask clientzclose dask clusterN)r@   closer   r?   move_dask_stdout_stderr_files)rA   r&   r&   r'   rl   R  s
    

zDaskCluster.closec                 C   s   | dkrt  }d| }|dkr<t|}td|| nN|drt|t|dd  d }td|| |d	k s||krtd
t|}||krtd|| t	t|d d	}td| n"|dkrd| }t|t|}|S )zFormat dask num_worker.
        :param cluster_type: str
        :param num_worker: str, number of workers to use
        :return: num_worker: int, number of workers to use
        r   znumWorker = allztranslate {} to {}%Nrf   d   r   zInvalid numWorker percentage!z:
WARNING: input number of worker: {} > available cores: {}rh   z+change number of worker to {} and continue
z5numWorker = all is NOT supported for cluster type: {})
r+   	cpu_countr.   r   r   endswithr   floatrJ   r   )r:   r;   Znum_corerM   r&   r&   r'   r=   c  s*    


zDaskCluster.format_num_workerc                 C   s   ddl }| jdkrd| _| jS | jdkr8td | j| _t|jd }| j|kr|jd}d| j|}|d| j7 }t| | j| _| jS )	aV  Format dask config_name property based on presence or absence of user specified config name.

        :return: config_name: str, the config_name formatted as follows:
                 - the user specified config name if its exists in $DASK_CONFIG/dask.yaml
                 - the default cluster_type config in $DASK_CONFIG/dask.yaml
        r   Nr   zFinput config name is None, thus use the default (same as cluster type)Zjobqueueconfigz+Dask configuration "{}" was not found in {}z*
Falling back to default config name: "{}")	daskr:   r9   r   listrt   r-   r4   r   )rA   ru   Zconfig_namesZconfig_locationrM   r&   r&   r'   r>     s    


zDaskCluster.format_config_namec                 C   s   t  d}t  d}t  d}t|| | dkr6dS d}d}||fD ]$}tj|r`t| t| qF|| D ]}t|| qt|D ]}t|| qdS )zEMove *o and *e files produced by dask into stdout and sderr directoryz*.oz*.ez!dask_command_run_from_python.txt*r   NZstdout_daskZstderr_dask)	globrR   r+   pathisdirshutilrmtreemkdirmove)rA   Zstdout_filesZstderr_filesZ	job_filesZstdout_folderZstderr_folderZstd_diritemr&   r&   r'   rm     s    



z)DaskCluster.move_dask_stdout_stderr_files)N)__name__
__module____qualname____doc__rC   rK   r\   rU   rV   rl   staticmethodr=   r>   rm   r&   r&   r&   r'   r8      s   
 )'5
7r8   )r   F)NT)T)r+   r_   rw   rz   numpyr   rI   r*   r(   r2   r7   r8   r&   r&   r&   r'   <module>   s   
0

