from snipar.gtarray import gtarray
import numpy as np
from snipar.read import get_gts_matrix
from snipar.utilities import *
[docs]class pgs(object):
"""Define a polygenic score based on a set of SNPs with weights and ref/alt allele pairs.
Args:
snp_ids : :class:`~numpy:numpy.array`
[L] vector of SNP ids
weights : :class:`~numpy:numpy.array`
[L] vector of weights of equal length to snp_ids
alleles : :class:`~numpy:numpy.array`
[L x 2] matrix of ref and alt alleles for the SNPs. L must match size of snp_ids
Returns:
pgs : :class:`snipar.pgs`
"""
def __init__(self,snp_ids,weights,alleles):
if snp_ids.shape[0] == weights.shape[0] and alleles.shape[0] == weights.shape[0] and alleles.shape[1]==2:
self.snp_ids = snp_ids
self.snp_dict = make_id_dict(snp_ids)
self.weights = weights
self.alleles = alleles
else:
raise ValueError('All inputs must have the same dimension')
[docs] def compute(self, garray, cols=None):
"""Compute polygenic score values from a given genotype array. Finds the SNPs in the genotype array
that have weights in the pgs and matching alleles, and computes the PGS based on these SNPs and the
weights after allele-matching.
Args:
garray : :class:`sbreg.gtarray`
genotype array to compute PGS values for
cols : :class:`numpy:numpy.array`
names to give the columns in the output gtarray
Returns:
pg : :class:`snipar.gtarray`
2d gtarray with PGS values. If a 3d gtarray is input, then each column corresponds to
the second dimension on the input gtarray (for example, individual, paternal, maternal PGS).
If a 2d gtarray is input, then there will be only one column in the output gtarray. The
names given in 'cols' are stored in 'sid' attribute of the output.
"""
if type(garray) == gtarray:
garray.fill_NAs()
else:
raise ValueError('Must be of gtarray class')
if garray.alleles is None:
raise ValueError('Alleles of genotype matrix must be provided')
# Match SNP IDs
in_pgs_snps = np.array([x in self.snp_dict for x in garray.sid])
nmatch = np.sum(in_pgs_snps)
if nmatch==0:
raise ValueError('No overlap between PGS SNPs and genotype SNPs')
# Get weights
matched_snps = garray.sid[in_pgs_snps]
matched_alleles = garray.alleles[in_pgs_snps,:]
snp_indices = np.zeros((nmatch),dtype=int)
for i in range(0,nmatch):
snp_indices[i] = self.snp_dict[matched_snps[i]]
weights_compute = self.weights[snp_indices]
alleles = self.alleles[snp_indices,:]
# Match alleles and adjust weights
a_match = np.logical_and(alleles[:,0] == matched_alleles[:, 0], alleles[:,1] == matched_alleles[:, 1])
a_reverse = np.logical_and(alleles[:,0] == matched_alleles[:, 1], alleles[:,1] == matched_alleles[:, 0])
a_nomatch = np.logical_and(np.logical_not(a_match), np.logical_not(a_reverse))
n_nomatch = np.sum(a_nomatch)
if n_nomatch > 0:
print('Removing ' + str(n_nomatch) + ' SNPs due to allele mismatch between genotypes and PGS alleles')
weights_compute[a_nomatch] = 0
weights_compute[a_reverse] = -weights_compute[a_reverse]
### Compute PGS
if garray.ndim == 2:
pgs_val = garray.gts[:, in_pgs_snps].dot(weights_compute)
elif garray.ndim == 3:
pgs_val = np.zeros((garray.gts.shape[0], garray.gts.shape[1]), garray.dtype)
for i in range(0, garray.gts.shape[1]):
pgs_val[:, i] = garray.gts[:, i, in_pgs_snps].dot(weights_compute)
return gtarray(pgs_val, garray.ids, sid=cols, fams=garray.fams)
[docs]def compute(pgs, bedfile=None, bgenfile=None, par_gts_f=None, ped=None, sib=False, compute_controls=False, verbose=True):
"""Compute a polygenic score (PGS) for the individuals with observed genotypes and observed/imputed parental genotypes.
Args:
par_gts_f : :class:`str`
path to HDF5 file with imputed parental genotypes
gts_f : :class:`str`
path to bed file with observed genotypes
pgs : :class:`snipar.pgs`
the PGS, defined by the weights for a set of SNPs and the alleles of those SNPs
sib : :class:`bool`
Compute the PGS for genotyped individuals with at least one genotyped sibling and observed/imputed parental genotypes. Default False.
compute_controls : :class:`bool`
Compute polygenic scores for control families (families with observed parental genotypes set to missing). Default False.
Returns:
pg : :class:`snipar.gtarray`
Return the polygenic score as a genotype array with columns: individual's PGS, mean of their siblings' PGS, observed/imputed paternal PGS,
observed/imputed maternal PGS
"""
G = get_gts_matrix(bedfile=bedfile, bgenfile=bgenfile, par_gts_f=par_gts_f, ped=ped, snp_ids=pgs.snp_ids, sib=sib, compute_controls=compute_controls, verbose=verbose)
if sib:
cols = np.array(['proband', 'sibling', 'paternal', 'maternal'])
else:
cols = np.array(['proband', 'paternal', 'maternal'])
if compute_controls:
pgs_out = [pgs.compute(x,cols) for x in G[0:3]]
if sib:
o_cols = np.array(['proband', 'sibling', 'parental'])
else:
o_cols = np.array(['proband','parental'])
pgs_out.append(pgs.compute(G[3], o_cols))
return pgs_out
else:
return pgs.compute(G,cols)
[docs]def write(pg,filename,scale_PGS = False):
if scale_PGS:
# Rescale by observed proband PGS
pg.gts = pg.gts / np.std(pg.gts[:, 0])
####### Write PGS to file ########
pg_out = np.column_stack((pg.fams,pg.ids,pg.gts))
pg_header = np.column_stack((np.array(['FID','IID']).reshape(1,2),pg.sid.reshape(1,pg.sid.shape[0])))
pg_out = np.row_stack((pg_header,pg_out))
print('Writing PGS to ' + filename)
np.savetxt(filename, pg_out, fmt='%s')
return None