Source code for snipar.utilities

import numpy as np
from os import path
import argparse
import re
[docs]def make_id_dict(x,col=0): """ Make a dictionary that maps from the values in the given column (col) to their row-index in the input array """ if len(x.shape)>1: x = x[:,col] id_dict = {} for i in range(0,x.shape[0]): id_dict[x[i]] = i return id_dict
[docs]def convert_str_array(x): """ Convert an ascii array to unicode array (UTF-8) """ x = np.array(x) x_shape = x.shape x = x.flatten() x_out = np.array([y.decode('UTF-8') for y in x]) return x_out.reshape(x_shape)
[docs]def encode_str_array(x): """ Encode a unicode array as an ascii array """ x = np.array(x) x_shape = x.shape x = x.flatten() x_out = np.array([y.encode('ascii') for y in x]) return x_out.reshape(x_shape)
[docs]def parse_obsfiles(obsfiles, obsformat='bed', append = True, wildcard = '@', chromosomes=None): obs_files = [] chroms = [] if wildcard in obsfiles: bed_ixes = obsfiles.split(wildcard) if chromosomes is None: chromosomes = np.arange(1,23) for i in chromosomes: obsfile = bed_ixes[0]+str(i)+bed_ixes[1]+'.'+obsformat if path.exists(obsfile): if append: obs_files.append(obsfile) else: obs_files.append(obsfile[:-(len(obsformat)+1)]) chroms.append(i) if len(obs_files)==0: raise(ValueError('Observed genotype files not found')) else: print(str(len(obs_files))+' files found') else: obsfile = obsfiles+'.'+obsformat if path.exists(obsfile): obs_files = [obsfile] chroms = [0] else: raise(ValueError(obsfile+' not found')) return np.array(obs_files), np.array(chroms,dtype=int)
[docs]def parse_filelist(obsfiles, impfiles, obsformat, chromosomes=None): obs_files = [] imp_files = [] chroms = [] if '@' in obsfiles and impfiles: bed_ixes = obsfiles.split('@') imp_ixes = impfiles.split('@') if chromosomes is None: chromosomes = np.arange(1,23) for i in chromosomes: obsfile = bed_ixes[0]+str(i)+bed_ixes[1]+'.'+obsformat impfile = imp_ixes[0]+str(i)+imp_ixes[1]+'.hdf5' if path.exists(impfile) and path.exists(obsfile): obs_files.append(obsfile) imp_files.append(impfile) chroms.append(i) if len(imp_files)==0: raise(ValueError('Observed/imputed genotype files not found')) else: print(str(len(imp_files))+' matched observed and imputed genotype files found') else: obsfile = obsfiles+'.'+obsformat impfile = impfiles+'.hdf5' if path.exists(obsfile) and path.exists(impfile): obs_files = [obsfile] imp_files = [impfile] chroms = [0] else: if not path.exists(obsfile): raise(ValueError(obsfile+' not found')) if not path.exists(impfile): raise(ValueError(impfile+' not found')) return np.array(obs_files), np.array(imp_files), np.array(chroms,dtype=int)
[docs]def outfile_name(outprefix,outsuffix,chrom=None): if '@' in outprefix: if chrom is None: raise(ValueError('Must provide chromosome number with wildcard character')) outprefix = outprefix.split('@') outprefix = outprefix[0]+str(chrom)+outprefix[1] return outprefix+outsuffix elif chrom is not None: return outprefix+'chr_'+str(chrom)+outsuffix else: return outprefix+outsuffix
[docs]def parseNumRange(string): """reads either a int or a range""" match_range = re.match(r' *(\d+) *- *(\d+) *', string) match_list = re.match(r' *(\d+) *', string) if match_range: start = int(match_range.group(1)) end = int(match_range.group(2)) result = [str(n) for n in range(start, end+1)] elif match_list: result = match_list.group(0) else: raise Exception(f"{string} is neither a range of the form x-y nor a list of integers of the form x y z") return result
[docs]class NumRangeAction(argparse.Action): """flattens and sorts the resulting num range. also removes duplicates""" def __call__(self, parser, args, values, option_string=None): result = [] for v in values: if isinstance(v,list): result += v if isinstance(v,str): result.append(v) result = np.unique(result).astype(int) result.sort() result = result.astype(str).tolist() setattr(args, self.dest, result)