Commit 4e7e93dd authored by Beatrice Tan's avatar Beatrice Tan
Browse files

Splitted report functions in two seperate scripts.

parent d8220aba
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib_venn import venn2
import pandas as pd
import seaborn as sns
from scipy.stats import ttest_ind
import os.path
import pyensembl
from math import log10
class parse:
"""Functions to parse the outputs from tools"""
def gistic_results(self, result_dir):
"""Create a dictionary with the recurrent regions called by GISTIC2.
Format: {"chr#:#-#" : (chrom, start, end, cnv_type, qval, [gene, gene])}"""
cnv_dict = {}
with open(os.path.join(result_dir, "all_lesions.conf_75.txt"), 'r') as lesions:
lesions.readline()
for lesion in lesions:
stats = lesion.strip().split("\t")
if stats[0].endswith("CN values"):
continue
else:
CNV_id = stats[2].split("(")[0]
chrom, bp = CNV_id.split(":")
start, end = bp.split("-")
cnv_type = 'amp' if stats[0].startswith("Amplification") else 'del'
qval = log10(float(stats[5])) #Convert q-value to log10(q-value)
#gene_file = os.path.join(result_dir, cnv_type + "_genes.conf_75.txt")
#list_genes = self.gistic_genes(gene_file, CNV_id)
list_genes = self.genes_locus(chrom, int(start), int(end))
cnv_data = [chrom, start, end, cnv_type, qval, list_genes]
cnv_dict[CNV_id] = tuple(cnv_data)
return cnv_dict
def genes_locus(self, chrom, start, end):
"""Get list of gene IDs at certain location."""
IDs = []
gene_info = ensembl.genes_at_locus(chrom, start, end)
for gene in gene_info:
ID = gene.gene_id
IDs.append(ID)
return IDs
def gistic_genes(self, gene_file, CNV_id):
"""Extract list of genes for a given recurrent CNV from GISTIC2 output."""
out_genes = []
with open(gene_file, 'r') as genes:
for i in range(3):
genes.readline()#next(genes)
column_CNV = genes.readline().split("\t").index(CNV_id)
for line in genes:
if len(line.split("\t")) >= column_CNV:
gene = line.split("\t")[column_CNV]
if gene != '':
out_genes.append(gene)
return out_genes
def rubic_results(self, results_gains, results_losses):
"""Create a dictionary with the recurrent regions called by RUBIC.
Format: {"chr#:#-#" : (chrom, start, end, cnv_type, (qval_left, qval_right), [gene, gene])}"""
cnv_dict = {}
for result_file in results_gains, results_losses:
with open(result_file, 'r') as lesions:
lesions.readline()
for lesion in lesions:
chrom, start, end, qval_left, qval_right, genes = lesion.split("\t")
CNV_id = "chr" + chrom + ":" + start + "-" + end
cnv_type = 'amp' if result_file.endswith('gains.txt') else 'del'
#list_genes = genes.replace("[", "").replace("]", "").strip().split(",")
list_genes = self.genes_locus(chrom, int(start), int(end))
cnv_data = [chrom, start, end, cnv_type, (qval_left, qval_right), list_genes]
cnv_dict[CNV_id] = tuple(cnv_data)
return cnv_dict
def gene_file(self, gene_file, top_nr):
"""Parse a file with a list of genes (one gene per line or first column).
Top_nr: the number of genes to extract (requires ranked gene list file), use np.inf to extract all genes."""
list_genes = []
num_read = 0
with open(gene_file, 'r') as genes:
for line in genes:
if line.startswith("Supplementary") or line.startswith("\t") or line.startswith("Gene"): #deal with supplement files with one or more header lines
continue
else:
gene = line.split("\t")[0]
list_genes.append(gene.strip())
if top_nr != np.inf:
list_genes = list_genes[0:top_nr]
return list_genes
def gene_file_IDs(self, gene_file, top_nr):
"""Parse a file with a list of genes (one gene per line or first column).
Top_nr: the number of genes to extract (requires ranked gene list file), use np.inf to extract all genes."""
list_genes = []
num_read = 0
with open(gene_file, 'r') as genes:
for line in genes:
if line.startswith("Supplementary") or line.startswith("\t") or line.startswith("Gene"): #deal with supplement files with one or more header lines
continue
else:
gene = line.split("\t")[0]
try:
gene_ID = ensembl.gene_ids_of_gene_name(gene)
list_genes = list_genes + gene_ID
except:
pass
if top_nr != np.inf:
list_genes = list_genes[0:top_nr]
return list_genes
def biomart_gene_info(self, biomart_file):
"""Parse file with all human genes from biomart into dictionary"""
biomart_dict = {}
with open(biomart_file, 'r') as biomart:
biomart.readline()
for line in biomart:
gene_id, gene_name, chrom, start, end = line.strip().split("\t")
if chrom.isnumeric(): #check if gene is located on chromosome or contig
dict_info = (gene_id, chrom, start, end)
if gene_name not in biomart_dict:
biomart_dict[gene_name] = [dict_info]
else:
biomart_dict[gene_name] = biomart_dict[gene_name] + [dict_info]
print(biomart_dict[gene_name])
#if gene_name.startswith("MIR"):
# ensembl_name = "hsa-mir-" + gene_name.strip("MIR")
# biomart_dict[ensembl_name] = gene_id, chrom, start, end
return biomart_dict
class stats:
"""Functions to calculate stats for the reports."""
def calculate_stats(self, alt_dict, census_file, known_genes_file, used_tool):
"""Calculate stats on the results from GISTIC2 and RUBIC."""
return_info = []
for cnv_type in 'amp', 'del':
tot_size, nr_genes, nr_reg_nogenes, nr_reg = 0, 0, 0, 0
all_genes = []
for alt in alt_dict.keys():
if alt_dict[alt][3] == cnv_type: #get amp stats
nr_reg += 1
tot_size += int(alt_dict[alt][2]) - int(alt_dict[alt][1])
nr_genes += len(alt_dict[alt][5]) #long list genes > unique genes
all_genes += [alt_dict[alt][5]]
type_name = 'Amplifications' if cnv_type == 'amp' else 'Deletions'
avg_size = int(tot_size / nr_reg / 1000) if nr_reg != 0 else 'N/A' #avg size in kb
stats = [used_tool, type_name, nr_reg, avg_size, tot_size / 1000000, nr_genes] #tot size in mb
stats = list(map(str, stats))
for ref_genes in census_file, known_genes_file:
ref_stat = self.overlap_stats(all_genes, ref_genes)
stats.append(ref_stat)
return_info.append(stats)
return return_info
def overlap_stats(self, gene_list, census_file):
"""Calculate the overlap with a file with a list of genes"""
census_genes = parse().gene_file_IDs(census_file, np.inf)
census_count = 0
for alt in gene_list:
census = False
for gene in alt:
if gene in census_genes:
census = True
if census == True:
census_count += 1
percent_census = (census_count / len(gene_list) * 100) if census_count != 0 else 0
stat_str = str(census_count) + " (" + str(round(percent_census, 2)) + "%)"
return stat_str
def get_precision_regions(self, known_regions): #Wat doet dit?
"""Give indication of precision of call based on capturing only the gene as recurrent region."""
percentages = []
new_dict = {}
for region in known_regions.keys():
genes = known_regions[region]
ends, starts = [], []
gene_positions = []
for gene in genes:
loci = ensembl.loci_of_gene_names(gene)
for locus in loci:
starts.append(locus.start)
ends.append(locus.end)
pos = "chr" + str(locus.contig) + ":" + str(locus.start) + "-" + str(locus.end)
gene_positions.append(pos)
gene_pos = range(min(starts), max(ends))
start_region, end_region = list(map(int, region.split(":")[1].split("-")))
xs = set(range(start_region, end_region))
overlap = len(xs.intersection(gene_pos))
perc_overlap = overlap / len(gene_pos) * 100
percentages.append(perc_overlap)
known_regions[region] = [known_regions[region], gene_positions]
return int(sum(percentages) / float(len(percentages))), known_regions
......@@ -8,6 +8,7 @@ from scipy.stats import ttest_ind
import os.path
import pyensembl
from math import log10
from ParseResults import parse, stats
def install_ensembl(reference_genome):
"""Import ensembl if needed and load correct release based on reference genome."""
......@@ -23,189 +24,6 @@ def install_ensembl(reference_genome):
except:
pass
class parse:
"""Functions to parse the outputs from tools"""
def gistic_results(self, result_dir):
"""Create a dictionary with the recurrent regions called by GISTIC2.
Format: {"chr#:#-#" : (chrom, start, end, cnv_type, qval, [gene, gene])}"""
cnv_dict = {}
with open(os.path.join(result_dir, "all_lesions.conf_75.txt"), 'r') as lesions:
lesions.readline()
for lesion in lesions:
stats = lesion.strip().split("\t")
if stats[0].endswith("CN values"):
continue
else:
CNV_id = stats[2].split("(")[0]
chrom, bp = CNV_id.split(":")
start, end = bp.split("-")
cnv_type = 'amp' if stats[0].startswith("Amplification") else 'del'
qval = log10(float(stats[5])) #Convert q-value to log10(q-value)
#gene_file = os.path.join(result_dir, cnv_type + "_genes.conf_75.txt")
#list_genes = self.gistic_genes(gene_file, CNV_id)
list_genes = self.genes_locus(chrom, int(start), int(end))
cnv_data = [chrom, start, end, cnv_type, qval, list_genes]
cnv_dict[CNV_id] = tuple(cnv_data)
return cnv_dict
def genes_locus(self, chrom, start, end):
"""Get list of gene IDs at certain location."""
IDs = []
gene_info = ensembl.genes_at_locus(chrom, start, end)
for gene in gene_info:
ID = gene.gene_id
IDs.append(ID)
return IDs
def gistic_genes(self, gene_file, CNV_id):
"""Extract list of genes for a given recurrent CNV from GISTIC2 output."""
out_genes = []
with open(gene_file, 'r') as genes:
for i in range(3):
genes.readline()#next(genes)
column_CNV = genes.readline().split("\t").index(CNV_id)
for line in genes:
if len(line.split("\t")) >= column_CNV:
gene = line.split("\t")[column_CNV]
if gene != '':
out_genes.append(gene)
return out_genes
def rubic_results(self, results_gains, results_losses):
"""Create a dictionary with the recurrent regions called by RUBIC.
Format: {"chr#:#-#" : (chrom, start, end, cnv_type, (qval_left, qval_right), [gene, gene])}"""
cnv_dict = {}
for result_file in results_gains, results_losses:
with open(result_file, 'r') as lesions:
lesions.readline()
for lesion in lesions:
chrom, start, end, qval_left, qval_right, genes = lesion.split("\t")
CNV_id = "chr" + chrom + ":" + start + "-" + end
cnv_type = 'amp' if result_file.endswith('gains.txt') else 'del'
#list_genes = genes.replace("[", "").replace("]", "").strip().split(",")
list_genes = self.genes_locus(chrom, int(start), int(end))
cnv_data = [chrom, start, end, cnv_type, (qval_left, qval_right), list_genes]
cnv_dict[CNV_id] = tuple(cnv_data)
return cnv_dict
def gene_file(self, gene_file, top_nr):
"""Parse a file with a list of genes (one gene per line or first column).
Top_nr: the number of genes to extract (requires ranked gene list file), use np.inf to extract all genes."""
list_genes = []
num_read = 0
with open(gene_file, 'r') as genes:
for line in genes:
if line.startswith("Supplementary") or line.startswith("\t") or line.startswith("Gene"): #deal with supplement files with one or more header lines
continue
else:
gene = line.split("\t")[0]
list_genes.append(gene.strip())
if top_nr != np.inf:
list_genes = list_genes[0:top_nr]
return list_genes
def gene_file_IDs(self, gene_file, top_nr):
"""Parse a file with a list of genes (one gene per line or first column).
Top_nr: the number of genes to extract (requires ranked gene list file), use np.inf to extract all genes."""
list_genes = []
num_read = 0
with open(gene_file, 'r') as genes:
for line in genes:
if line.startswith("Supplementary") or line.startswith("\t") or line.startswith("Gene"): #deal with supplement files with one or more header lines
continue
else:
gene = line.split("\t")[0]
try:
gene_ID = ensembl.gene_ids_of_gene_name(gene)
list_genes = list_genes + gene_ID
except:
pass
if top_nr != np.inf:
list_genes = list_genes[0:top_nr]
return list_genes
def biomart_gene_info(self, biomart_file):
"""Parse file with all human genes from biomart into dictionary"""
biomart_dict = {}
with open(biomart_file, 'r') as biomart:
biomart.readline()
for line in biomart:
gene_id, gene_name, chrom, start, end = line.strip().split("\t")
if chrom.isnumeric(): #check if gene is located on chromosome or contig
dict_info = (gene_id, chrom, start, end)
if gene_name not in biomart_dict:
biomart_dict[gene_name] = [dict_info]
else:
biomart_dict[gene_name] = biomart_dict[gene_name] + [dict_info]
print(biomart_dict[gene_name])
#if gene_name.startswith("MIR"):
# ensembl_name = "hsa-mir-" + gene_name.strip("MIR")
# biomart_dict[ensembl_name] = gene_id, chrom, start, end
return biomart_dict
class stats:
"""Functions to calculate stats for the reports."""
def calculate_stats(self, alt_dict, census_file, known_genes_file, used_tool):
"""Calculate stats on the results from GISTIC2 and RUBIC."""
return_info = []
for cnv_type in 'amp', 'del':
tot_size, nr_genes, nr_reg_nogenes, nr_reg = 0, 0, 0, 0
all_genes = []
for alt in alt_dict.keys():
if alt_dict[alt][3] == cnv_type: #get amp stats
nr_reg += 1
tot_size += int(alt_dict[alt][2]) - int(alt_dict[alt][1])
nr_genes += len(alt_dict[alt][5]) #long list genes > unique genes
all_genes += [alt_dict[alt][5]]
type_name = 'Amplifications' if cnv_type == 'amp' else 'Deletions'
avg_size = int(tot_size / nr_reg / 1000) if nr_reg != 0 else 'N/A' #avg size in kb
stats = [used_tool, type_name, nr_reg, avg_size, tot_size / 1000000, nr_genes] #tot size in mb
stats = list(map(str, stats))
for ref_genes in census_file, known_genes_file:
ref_stat = self.overlap_stats(all_genes, ref_genes)
stats.append(ref_stat)
return_info.append(stats)
return return_info
def overlap_stats(self, gene_list, census_file):
"""Calculate the overlap with a file with a list of genes"""
census_genes = parse().gene_file_IDs(census_file, np.inf)
census_count = 0
for alt in gene_list:
census = False
for gene in alt:
if gene in census_genes:
census = True
if census == True:
census_count += 1
percent_census = (census_count / len(gene_list) * 100) if census_count != 0 else 0
stat_str = str(census_count) + " (" + str(round(percent_census, 2)) + "%)"
return stat_str
def get_precision_regions(self, known_regions): #Wat doet dit?
"""Give indication of precision of call based on capturing only the gene as recurrent region."""
percentages = []
new_dict = {}
for region in known_regions.keys():
genes = known_regions[region]
ends, starts = [], []
gene_positions = []
for gene in genes:
loci = ensembl.loci_of_gene_names(gene)
for locus in loci:
starts.append(locus.start)
ends.append(locus.end)
pos = "chr" + str(locus.contig) + ":" + str(locus.start) + "-" + str(locus.end)
gene_positions.append(pos)
gene_pos = range(min(starts), max(ends))
start_region, end_region = list(map(int, region.split(":")[1].split("-")))
xs = set(range(start_region, end_region))
overlap = len(xs.intersection(gene_pos))
perc_overlap = overlap / len(gene_pos) * 100
percentages.append(perc_overlap)
known_regions[region] = [known_regions[region], gene_positions]
return int(sum(percentages) / float(len(percentages))), known_regions
class ReportSegmentation():
"""Make a report of the segmentation file used as input."""
def __init__(self, segmentation_file, report_file):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment