Source code for snipar.tests.test_pedigree_creation

import unittest
import subprocess
import pandas as pd
from snipar.imputation.preprocess_data import create_pedigree, add_control
import networkx as nx
import os
import numpy as np
from snipar.tests.utils import *

[docs]def node_match(node1, node2): return node1["observed"] == node2["observed"]
[docs]def create_graph_from_pedigree(pedigree): fam_tree = nx.DiGraph() observed = {} for index, row in pedigree.iterrows(): iid = row["IID"] father = row["FATHER_ID"] mother = row["MOTHER_ID"] if iid not in fam_tree: fam_tree.add_node(iid) if father not in fam_tree: fam_tree.add_node(iid) if mother not in fam_tree: fam_tree.add_node(iid) observed[iid] = True observed[father] = observed.get(father, False) observed[mother] = observed.get(mother, False) fam_tree.add_edge(mother, iid) fam_tree.add_edge(father, iid) nx.set_node_attributes(fam_tree, observed, 'observed') return fam_tree
[docs]class TestPedigree(SniparTest):
[docs] def test_create_pedigree(self): result = create_pedigree(f"{tests_root}/test_data/pedigree_creation_sample.king", f"{tests_root}/test_data/pedigree_creation_sample.agesex", ).sort_values(by=['FID', "IID"]) expected = pd.read_csv(f"{tests_root}/test_data/pedigree_creation_sample.ped", delim_whitespace=True).sort_values(by=['FID', "IID"]) result_graph = create_graph_from_pedigree(result) expected_graph = create_graph_from_pedigree(expected) equality = nx.is_isomorphic(result_graph, expected_graph, node_match) self.assertTrue(equality)
[docs] def test_add_control(self): pedigree = pd.read_csv(f"{tests_root}/test_data/pedigree_creation_sample.ped", delim_whitespace=True).sort_values(by=['FID', "IID"]).astype(str) pedigree["has_father"] = pedigree["FATHER_ID"].isin(pedigree["IID"]) pedigree["has_mother"] = pedigree["MOTHER_ID"].isin(pedigree["IID"]) controlled_pedigree = add_control(pedigree).astype(str) controlled_fams = [fid[3:] for fid in controlled_pedigree["FID"] if fid.startswith("_")] has_father = pedigree["FATHER_ID"].isin(pedigree["IID"]) has_mother = pedigree["MOTHER_ID"].isin(pedigree["IID"]) sibs = pedigree[has_father & has_mother].groupby(["FID", "FATHER_ID", "MOTHER_ID"]).agg({"IID":lambda x:len(list(x))}) sibs = sibs.groupby("FID").agg({"IID":max}).reset_index() no_parent_control = sibs[sibs["IID"]>1]["FID"].values.tolist() one_parent_control = pedigree[pedigree["has_father"] & pedigree["has_mother"]]["FID"].values.tolist() expected_controlls = no_parent_control + one_parent_control self.assertEqual(set(controlled_fams), set(expected_controlls)) controlled_pedigree = controlled_pedigree[controlled_pedigree["FID"].str.startswith("_")] controlled_pedigree["FID"] = controlled_pedigree["FID"].str[3:] merged = pd.merge(pedigree, controlled_pedigree, on=["FID", "IID"]) self.assertEqual(merged.shape[0], controlled_pedigree.shape[0])