Source code for snipar.pgs

from snipar.gtarray import gtarray
import numpy as np
from snipar.read import get_gts_matrix
from snipar.utilities import *

[docs]class pgs(object): """Define a polygenic score based on a set of SNPs with weights and ref/alt allele pairs. Args: snp_ids : :class:`~numpy:numpy.array` [L] vector of SNP ids weights : :class:`~numpy:numpy.array` [L] vector of weights of equal length to snp_ids alleles : :class:`~numpy:numpy.array` [L x 2] matrix of ref and alt alleles for the SNPs. L must match size of snp_ids Returns: pgs : :class:`snipar.pgs` """ def __init__(self,snp_ids,weights,alleles): if snp_ids.shape[0] == weights.shape[0] and alleles.shape[0] == weights.shape[0] and alleles.shape[1]==2: self.snp_ids = snp_ids self.snp_dict = make_id_dict(snp_ids) self.weights = weights self.alleles = alleles else: raise ValueError('All inputs must have the same dimension')
[docs] def compute(self, garray, cols=None): """Compute polygenic score values from a given genotype array. Finds the SNPs in the genotype array that have weights in the pgs and matching alleles, and computes the PGS based on these SNPs and the weights after allele-matching. Args: garray : :class:`sbreg.gtarray` genotype array to compute PGS values for cols : :class:`numpy:numpy.array` names to give the columns in the output gtarray Returns: pg : :class:`snipar.gtarray` 2d gtarray with PGS values. If a 3d gtarray is input, then each column corresponds to the second dimension on the input gtarray (for example, individual, paternal, maternal PGS). If a 2d gtarray is input, then there will be only one column in the output gtarray. The names given in 'cols' are stored in 'sid' attribute of the output. """ if type(garray) == gtarray: garray.fill_NAs() else: raise ValueError('Must be of gtarray class') if garray.alleles is None: raise ValueError('Alleles of genotype matrix must be provided') # Match SNP IDs in_pgs_snps = np.array([x in self.snp_dict for x in garray.sid]) nmatch = np.sum(in_pgs_snps) if nmatch==0: raise ValueError('No overlap between PGS SNPs and genotype SNPs') # Get weights matched_snps = garray.sid[in_pgs_snps] matched_alleles = garray.alleles[in_pgs_snps,:] snp_indices = np.zeros((nmatch),dtype=int) for i in range(0,nmatch): snp_indices[i] = self.snp_dict[matched_snps[i]] weights_compute = self.weights[snp_indices] alleles = self.alleles[snp_indices,:] # Match alleles and adjust weights a_match = np.logical_and(alleles[:,0] == matched_alleles[:, 0], alleles[:,1] == matched_alleles[:, 1]) a_reverse = np.logical_and(alleles[:,0] == matched_alleles[:, 1], alleles[:,1] == matched_alleles[:, 0]) a_nomatch = np.logical_and(np.logical_not(a_match), np.logical_not(a_reverse)) n_nomatch = np.sum(a_nomatch) if n_nomatch > 0: print('Removing ' + str(n_nomatch) + ' SNPs due to allele mismatch between genotypes and PGS alleles') weights_compute[a_nomatch] = 0 weights_compute[a_reverse] = -weights_compute[a_reverse] ### Compute PGS if garray.ndim == 2: pgs_val = garray.gts[:, in_pgs_snps].dot(weights_compute) elif garray.ndim == 3: pgs_val = np.zeros((garray.gts.shape[0], garray.gts.shape[1]), garray.dtype) for i in range(0, garray.gts.shape[1]): pgs_val[:, i] = garray.gts[:, i, in_pgs_snps].dot(weights_compute) return gtarray(pgs_val, garray.ids, sid=cols, fams=garray.fams)
[docs]def compute(pgs, bedfile=None, bgenfile=None, par_gts_f=None, ped=None, sib=False, compute_controls=False, verbose=True): """Compute a polygenic score (PGS) for the individuals with observed genotypes and observed/imputed parental genotypes. Args: par_gts_f : :class:`str` path to HDF5 file with imputed parental genotypes gts_f : :class:`str` path to bed file with observed genotypes pgs : :class:`snipar.pgs` the PGS, defined by the weights for a set of SNPs and the alleles of those SNPs sib : :class:`bool` Compute the PGS for genotyped individuals with at least one genotyped sibling and observed/imputed parental genotypes. Default False. compute_controls : :class:`bool` Compute polygenic scores for control families (families with observed parental genotypes set to missing). Default False. Returns: pg : :class:`snipar.gtarray` Return the polygenic score as a genotype array with columns: individual's PGS, mean of their siblings' PGS, observed/imputed paternal PGS, observed/imputed maternal PGS """ G = get_gts_matrix(bedfile=bedfile, bgenfile=bgenfile, par_gts_f=par_gts_f, ped=ped, snp_ids=pgs.snp_ids, sib=sib, compute_controls=compute_controls, verbose=verbose) if sib: cols = np.array(['proband', 'sibling', 'paternal', 'maternal']) else: cols = np.array(['proband', 'paternal', 'maternal']) if compute_controls: pgs_out = [pgs.compute(x,cols) for x in G[0:3]] if sib: o_cols = np.array(['proband', 'sibling', 'parental']) else: o_cols = np.array(['proband','parental']) pgs_out.append(pgs.compute(G[3], o_cols)) return pgs_out else: return pgs.compute(G,cols)
[docs]def write(pg,filename,scale_PGS = False): if scale_PGS: # Rescale by observed proband PGS pg.gts = pg.gts / np.std(pg.gts[:, 0]) ####### Write PGS to file ######## pg_out = np.column_stack((pg.fams,pg.ids,pg.gts)) pg_header = np.column_stack((np.array(['FID','IID']).reshape(1,2),pg.sid.reshape(1,pg.sid.shape[0]))) pg_out = np.row_stack((pg_header,pg_out)) print('Writing PGS to ' + filename) np.savetxt(filename, pg_out, fmt='%s') return None