Source code for intervalframe.core.IntervalSeries

from ailist import IntervalArray, LabeledIntervalArray
import pandas as pd
import numpy as np
import copy as cp
from .index import indexers
from tabulate import tabulate
from bcpseg import bcpseg
import cbseg
from collections import Counter
from . import IntervalFrame


#_mpl_repr
[docs]class IntervalSeries(object): """ Annotated augmented interval list :class:`~intervalframe.IntervalFrame` stores a intervals """ def __init__(self, intervals, series=None, labels=None, dtype=None, copy_series=False, copy_intervals=False): """ Initialize IntervalFrame Parameters ---------- intervals : AIList Intervals to be stored sereis : pandas.Series DataFrame to annotate intervals with labels : array-like Labels for hierarchical indexing dtype : Dtype of series copy_sereies : bool Whether to copy Series copy_intervals : bool Whether to copy AIList Returns ------- None """ # Determine if intervals need to be copied if copy_intervals: intervals = cp.copy(intervals) # Initialize Index if isinstance(intervals, IntervalArray) or isinstance(intervals, LabeledIntervalArray): self.index = intervals else: raise TypeError("Unrecognized input for intervals.") # Initialize Series if series is None: # Intervals given if intervals is None: if dtype is None: series = pd.Series([], index=range(0)) else: series = pd.Series([], index=range(0)).astype(dtype, copy=False) else: if dtype is None: series = pd.DataFrame([], index=range(len(self.index))) else: series = pd.DataFrame([], index=range(len(self.index))).astype(dtype, copy=False) elif isinstance(series, pd.Series): if copy_series: series = series.copy(deep=True) # Set series self.series = series # Make sure index is frozen self.index.freeze() def __repr__(self): """ IntervalFrame representation """ # Initialized string repr_string = "" # If no columns present if self.series.shape[0] == 0: repr_string += repr(self.index) repr_string += repr(self.series) # If columns present else: # Determine dimensions n_rows = min(self.series.shape[0], 5) # Initialize values repr_list = [[] for i in range(n_rows+1)] # Determine column names repr_list[0] = ["interval"] repr_list[0] += [str(self.series.name)] i = 0 # track rows for interval in self.index: if i >= n_rows: break #repr_list[i+1].append(repr(interval).split(",")[0] + ")") repr_list[i+1].append(repr(interval)) repr_list[i+1] += [str(self.series.values[i])] i += 1 # Create tabulate table repr_string = tabulate(repr_list, headers="firstrow") return repr_string @property def shape(self): """ """ return self.series.shape @property def iloc(self): """ """ return indexers.iLocator(self) @property def loc(self): """ """ return indexers.Locator(self) @property def values(self): """ """ return self.series.values
[docs] @staticmethod def from_array(starts, ends, labels=None): """ """ # Add intervals if labels is None: index = IntervalArray() index.from_array(starts, ends) else: index = LabeledIntervalArray() index.from_array(starts, ends, labels) # Create IntervalSeries iseries = IntervalSeries(index) return iseries
[docs] def starts(self): """ """ # Extract starts in intervals starts = self.index.extract_starts() return starts
[docs] def ends(self): """ """ # Extract ends in intervals ends = self.index.extract_ends() return ends
[docs] def intersect(self, start, end, label=None): """ Find intersecting intervals Paramters --------- start : int Starting position end : int Ending position label : str Label to intersect with [default: None] Returns ------- overlaps : IntervalFrame Overlapping intervals """ # Intersect if label is None: overlaps, overlap_index = self.index.intersect(start, end, return_intervals=True, return_index=True) else: overlaps, overlap_index = self.index.intersect(start, end, label=str(label), return_intervals=True, return_index=True) # Create df if len(self.series) > 0: series = pd.Series(self.series.values[overlap_index]) else: series = pd.Series([], index=range(len(overlaps))) # Create IntervalFrame overlaps = IntervalSeries(overlaps, series, copy_intervals=False, copy_series=False) return overlaps
[docs] def subtract(self, iframe): """ Subtract intersecting regions for IntervalFrame Parameters ---------- iframe : IntervalFrame Intervals to remove Returns ------- subtracted_frame : IntervalFrame Intervals with removed regions """ pass
[docs] def difference(self, iframe): """ Remove any overlapping intervals Parameters ---------- iframe : IntervalFrame Intervals to remove Returns ------- diff_frame : IntervalFrame Intervals that do not overlap """ pass
[docs] def nhits(self, start, end): """ Find number of intersecting intervals Paramters --------- start : int Starting position end : int Ending position Returns ------- n : int Number of overlapping intervals """ pass
[docs] def nhits_from_array(self, starts, ends): """ Find number of intersecting intervals Paramters --------- start : numpy.ndarray Starting position end : numpoy.ndarray Ending position Returns ------- n : numpy.ndarray Number of overlapping intervals """ pass
[docs] def coverage(self): """ Find number of intervals overlapping every position in the IntervalFrame Parameters ---------- None Returns ------- pandas.Series{double} Position on index and coverage as values """ pass
[docs] def bin_coverage(self, bin_size=100000, min_length=None, max_length=None): """ Find sum of coverage within binned positions Parameters ---------- bin_size : int Size of the bin to use min_length : int Minimum length of intervals to include [default = None] max_length : int Maximum length of intervals to include [default = None] Returns ------- cov_iframe : IntervalFrame Positions of coverage values """ # Determine nhits #ailist_cov = self.intervals.bin_coverage(bin_size=bin_size, min_length=min_length, max_length=max_length) # Create AIList from pd.Series index #positions = AIList() #positions.from_array(ailist_cov.index.values, #ailist_cov.index.values + bin_size, #np.arange(len(ailist_cov)), #ailist_cov.values) # Construct IntervalFrame #cov_iframe = IntervalFrame(intervals=positions, df=ailist_cov.to_frame()) #return cov_iframe pass
[docs] def bin_nhits(self, bin_size=100000, min_length=None, max_length=None): """ Find number of intervals overlapping binned positions Parameters ---------- bin_size : int Size of the bin to use min_length : int Minimum length of intervals to include [default = None] max_length : int Maximum length of intervals to include [default = None] Returns ------- nhits_iframe : IntervalFrame Position of nhits values """ # Determine nhits nhits_bins, nhits_index = self.index.bin_nhits(bin_size, min_length, max_length) # Construct IntervalFrame nhits_df = pd.DataFrame(nhits_index, index=range(len(nhits_index)), columns=["nhits"]) nhits_iframe = IntervalSeries(nhits_bins, nhits_df, copy_intervals=False, copy_df=False) return nhits_iframe
[docs] def overlap(self, iframe, key="overlap"): """ Find overlaps in one IntervalFrame with another Parameters ---------- iframe : IntervalFrame Intervals used to annotate key : str Name of the column to use in resulting IntervalFrame Returns ------- results_iframe : IntervalFrame Intervals with column indicating index of overlap """ # Find overlaps if isinstance(iframe.index, LabeledIntervalArray) and isinstance(self.index, LabeledIntervalArray): query_index, ref_index = self.index.intersect_from_LabeledIntervalArray(iframe.index, return_intervals=False, return_index=True) elif isinstance(iframe.index, IntervalArray) and isinstance(self.index, IntervalArray): query_index, ref_index = self.index.intersect_from_IntervalArray(iframe.index, return_intervals=False, return_index=True) else: raise TypeError("IntervalFrames must have same type of index.") # Index iframes results_iframe = IntervalFrame.IntervalFrame(self.index[ref_index], copy_intervals=False) results_iframe.df["overlap"] = query_index #Create intervals #results_intervals = self.index[ref_index] # Create df #if self.df.shape[1] > 0: #df = pd.DataFrame(self.df.values[ref_index,:], #columns=self.df.columns.values).astype(self.df.dtypes.to_dict(), copy=False) #df = self.df.iloc[ref_index,:].reset_index(drop=True, inplace=True) #else: #df = pd.DataFrame([], index=range(len(results_intervals))) # Create IntervalFrame #results_iframe = IntervalFrame(results_intervals, df, copy_intervals=False, copy_df=False) #results_iframe.df["overlap"] = query_index return results_iframe
[docs] def merge(self, gap=0): """ Annotate values in one IntervalFrame with another Parameters ---------- gap : int Allowed gap between intervals [default: 0] Returns ------- merged_iframe : IntervalFrame Merged intervals """ # Merge intervals merged_index = self.index.merge(gap=gap) # Create IntervalFrame merged_iframe = IntervalSeries(merged_index, copy_intervals=False) return merged_iframe
[docs] def segment(self, method="bcp_online", cutoff=0.5, hazard=100, shuffles=5000, p=0.00005): """ Annotate values in one IntervalFrame with another Parameters ---------- method : str Method for segmenting intervals cutoff : float (default = 0.5) Cutoff for bcp methods hazard : int (default = 100) Hazard values for bcp methods shuffles : int (default = 5000) Number of shuffles for cbs method p : float (default = 0.00005) Pvalue cutoff for cbs method Returns ------- segment_iseries : IntervalSeries Segmented Intervals """ # Segment intervals for IntervalArray if method == "bcp_online": if isinstance(self.index, IntervalArray): segment_intervals = bcpseg(self.series.values, cutoff=cutoff, method="online", hazard=hazard) else: segment_intervals = bcpseg(self.series.values, labels=self.index.extract_labels(), cutoff=cutoff, method="online", hazard=hazard) elif method == "bcp_online_both": if isinstance(self.index, IntervalArray): segment_intervals = bcpseg(self.series.values, cutoff=cutoff, method="online_both", hazard=hazard) else: segment_intervals = bcpseg(self.dseries.values, labels=self.index.extract_labels(), cutoff=cutoff, method="online_both", hazard=hazard) elif method == "bcp_offline": if isinstance(self.index, IntervalArray): segment_intervals = bcpseg(self.series.values, cutoff=cutoff, method="offline", hazard=hazard) else: segment_intervals = bcpseg(self.series.values, labels=self.index.extract_labels(), cutoff=cutoff, method="offline", hazard=hazard) elif method == "cbs": if isinstance(self.index, IntervalArray): segment_intervals = cbseg.segment(self.series.values, shuffles=shuffles, p=p) else: segment_intervals = cbseg.segment(self.series.values, labels=self.index.extract_labels(), shuffles=shuffles, p=p) else: raise NameError("method input not recognized.") #Re-index segments segment_intervals.index_with_aiarray(self.index) # Create IntervalSeries segment_iseries = IntervalSeries(segment_intervals) return segment_iseries
[docs] def downsample(self, proportion): """ Randomly downsample intervals Parameters ---------- proportion : float Proportion of intervals to keep Returns ------- filtered_iseries : IntervalSeries Downsampled values """ # Downsample filtered_intervals, filtered_index = self.index.downsample(proportion, return_intervals=True, return_index=True) # Create series series = pd.Series([], index=range(len(filtered_intervals))) # Create IntervalSeries filtered_iseries = IntervalSeries(filtered_intervals, series, copy_intervals=False, copy_series=False) return filtered_iseries
[docs] def length_dist(self): """ Calculate length distribution of intervals Parameters ---------- None Returns ------- length_distribution : numpy.ndarray Length distribution """ # Calculate length distribution length_distribution = self.index.length_dist() return length_distribution
[docs] def wps(self, protection=60, min_length=None, max_length=None): """ Calculate Window Protection Score for each position in AIList range Parameters ---------- protection : int Protection window to use min_length : int Minimum length of intervals to include [default = None] max_length : int Maximum length of intervals to include [default = None] label : str Label for hierarchical indexing Returns ------- scores : dict of pandas.Series or pandas.Series Position on index and WPS as values """ # Calculate WPS without label scores = self.index.wps(protection, min_length, max_length) return scores
[docs] def exact_match(self, iframe): """ Find exact matches between LabeledIntervalArrays Parameters ---------- iframe : LabeledIntervalArray Intervals to match Returns ------- filtered_iseries : IntervalSeries Matched intervals """ # Determine matches filtered_intervals, filtered_index = self.index.filter_exact_match(iframe.index) # Create series series = pd.Series([], index=range(len(filtered_intervals))) # Create IntervalSeries filtered_iseries = IntervalSeries(filtered_intervals, series, copy_intervals=False, copy_series=False) return filtered_iseries
[docs] def copy(self): """ Copy IntervalSeries Parameters ---------- None Returns ------- copy_iseries : IntervalSeries Copied intervals """ # Make copy of IntervalSeries copy_iseries = IntervalSeries(self.index, self.series, copy_intervals=True, copy_series=True) return copy_iseries