import numpy as np
import pandas as pd
import math
from copy import deepcopy
from abc import abstractmethod, ABCMeta
from bids.utils import listify
from itertools import chain
from six import add_metaclass
from bids.utils import matches_entities
[docs]@add_metaclass(ABCMeta)
class BIDSVariable(object):
''' Base representation of a column in a BIDS project. '''
# Columns that define special properties (e.g., onset, duration). These
# will be stored separately from the main data object, and are accessible
# as properties on the BIDSVariable instance.
_property_columns = set()
[docs] def __init__(self, name, values, source):
self.name = name
self.values = values
self.source = source
self._index_entities()
[docs] def clone(self, data=None, **kwargs):
''' Clone (deep copy) the current column, optionally replacing its
data and/or any other attributes.
Args:
data (DataFrame, ndarray): Optional new data to substitute into
the cloned column. Must have same dimensionality as the
original.
kwargs (dict): Optional keyword arguments containing new attribute
values to set in the copy. E.g., passing `name='my_name'`
would set the `.name` attribute on the cloned instance to the
passed value.
'''
result = deepcopy(self)
if data is not None:
if data.shape != self.values.shape:
raise ValueError("Replacement data has shape %s; must have "
"same shape as existing data %s." %
(data.shape, self.values.shape))
result.values = pd.DataFrame(data)
if kwargs:
for k, v in kwargs.items():
setattr(result, k, v)
# Need to update name on Series as well
# result.values.name = kwargs.get('name', self.name)
return result
[docs] def filter(self, filters=None, query=None, strict=False, inplace=False):
''' Returns a copy of the current Variable with only rows that match
the filters retained.
Args:
filters (dict): Dictionary of filters to apply. Keys can be either
'amplitude' or any named entity. Values must be single values
or lists.
query (str): Optional query string to pass to df.query(). Will not
be validated in any way, so must have valid column names. Takes
precedence over filters in the event that both are passed.
strict (bool): By default, keys in 'filters' that cannot be found
in the Variable will be silently ignored. If strict=True, None
will be returned in such cases.
inplace (bool): If True, filtering is performed in place. If False,
a filtered copy of the Variable is returned.
Returns:
A BIDSVariable, or None if no rows are left after filtering.
'''
if filters is None and query is None:
raise ValueError("Either the 'filters' or the 'query' argument "
"must be provided!")
if filters is not None and query is None:
query = []
for name, val in filters.items():
if name != 'amplitude' and name not in self.index.columns:
if strict:
return None
continue
oper = 'in' if isinstance(val, (list, tuple)) else '=='
q = '{name} {oper} {val}'.format(name=name, oper=oper,
val=repr(val))
query.append(q)
query = ' and '.join(query)
var = self if inplace else self.clone()
if query:
inds = self.to_df().query(query).index
var.values = var.values.loc[inds]
var.index = var.index.loc[inds]
if hasattr(self, '_build_entity_index'):
var._build_entity_index()
if not inplace:
return var
[docs] @classmethod
def merge(cls, variables, name=None, **kwargs):
''' Merge/concatenate a list of variables along the row axis.
Args:
variables (list): A list of Variables to merge.
name (str): Optional name to assign to the output Variable. By
default, uses the same name as the input variables.
kwargs: Optional keyword arguments to pass onto the class-specific
merge() call. See merge_variables docstring for details.
Returns:
A single BIDSVariable of the same class as the input variables.
Notes: see merge_variables docstring for additional details.
'''
variables = listify(variables)
if len(variables) == 1:
return variables[0]
var_names = set([v.name for v in variables])
if len(var_names) > 1:
raise ValueError("Columns with different names cannot be merged. "
"Column names provided: %s" % var_names)
if name is None:
name = variables[0].name
return cls._merge(variables, name, **kwargs)
@classmethod
@abstractmethod
def _merge(cls, variables, name, **kwargs):
pass
[docs] def get_grouper(self, groupby='run'):
''' Return a list suitable for use in groupby calls.
Args:
groupby (str, list): Name(s) of column(s) defining the grouper
object. Anything that would be valid inside a .groupby() call
on a pandas structure.
Returns:
A list defining the groups.
'''
grouper = self.index.loc[:, groupby]
return grouper.apply(lambda x: '@@@'.join(x.astype(str).values),
axis=1)
[docs] def apply(self, func, groupby='run', *args, **kwargs):
''' Applies the passed function to the groups defined by the groupby
argument. Works identically to the standard pandas df.groupby() call.
Args:
func (callable): The function to apply to each group.
groupby (str, list): Name(s) of column(s) defining the grouping.
args, kwargs: Optional positional and keyword arguments to pass
onto the function call.
'''
grouper = self.get_grouper(groupby)
return self.values.groupby(grouper).apply(func, *args, **kwargs)
[docs] def to_df(self, condition=True, entities=True, **kwargs):
''' Convert to a DataFrame, with columns for name and entities.
Args:
condition (bool): If True, adds a column for condition name, and
names the amplitude column 'amplitude'. If False, returns just
onset, duration, and amplitude, and gives the amplitude column
the current column name.
entities (bool): If True, adds extra columns for all entities.
'''
amp = 'amplitude' if condition else self.name
data = pd.DataFrame({amp: self.values.values.ravel()})
for sc in self._property_columns:
data[sc] = getattr(self, sc)
if condition:
data['condition'] = self.name
if entities:
ent_data = self.index.reset_index(drop=True)
data = pd.concat([data, ent_data], axis=1, sort=True)
return data.reset_index(drop=True)
[docs] def matches_entities(self, entities, strict=False):
''' Checks whether current Variable's entities match the input. '''
return matches_entities(self, entities, strict)
def _index_entities(self):
''' Returns a dict of entities for the current Variable.
Note: Only entity key/value pairs common to all rows in the Variable
are returned. E.g., if a Variable contains events extracted from
runs 1, 2 and 3 from subject '01', the returned dict will be
{'subject': '01'}; the runs will be excluded as they vary across
the Variable contents.
'''
constant = self.index.apply(lambda x: x.nunique() == 1)
if constant.empty:
self.entities = {}
else:
keep = self.index.columns[constant]
self.entities = {k: self.index[k].dropna().iloc[0] for k in keep}
[docs]class SimpleVariable(BIDSVariable):
''' Represents a simple design matrix column that has no timing
information.
Args:
name (str): Name of the column.
data (DataFrame): A pandas DataFrame minimally containing a column
named 'amplitude' as well as any identifying entities.
source (str): The type of BIDS variable file the data were extracted
from. Must be one of: 'events', 'physio', 'stim', 'regressors',
'scans', 'sessions', 'participants', or 'beh'.
kwargs: Optional keyword arguments passed onto superclass.
'''
_entity_columns = {'condition', 'amplitude'}
[docs] def __init__(self, name, data, source, **kwargs):
ent_cols = list(set(data.columns) - self._entity_columns)
self.index = data.loc[:, ent_cols]
values = data['amplitude'].reset_index(drop=True)
values.name = name
super(SimpleVariable, self).__init__(name, values, source)
[docs] def split(self, grouper):
''' Split the current SparseRunVariable into multiple columns.
Args:
grouper (iterable): list to groupby, where each unique value will
be taken as the name of the resulting column.
Returns:
A list of SparseRunVariables, one per unique value in the
grouper.
'''
data = self.to_df(condition=True, entities=True)
data = data.drop('condition', axis=1)
subsets = []
for i, (name, g) in enumerate(data.groupby(grouper)):
name = '%s.%s' % (self.name, name)
col = self.__class__(name=name, data=g, source=self.source,
run_info=getattr(self, 'run_info', None))
subsets.append(col)
return subsets
@classmethod
def _merge(cls, variables, name, **kwargs):
dfs = [v.to_df() for v in variables]
data = pd.concat(dfs, axis=0, sort=True).reset_index(drop=True)
data = data.rename(columns={name: 'amplitude'})
return cls(name, data, source=variables[0].source, **kwargs)
[docs] def select_rows(self, rows):
''' Truncate internal arrays to keep only the specified rows.
Args:
rows (array): An integer or boolean array identifying the indices
of rows to keep.
'''
self.values = self.values.iloc[rows]
self.index = self.index.iloc[rows, :]
for prop in self._property_columns:
vals = getattr(self, prop)[rows]
setattr(self, prop, vals)
[docs]class SparseRunVariable(SimpleVariable):
''' A sparse representation of a single column of events.
Args:
name (str): Name of the column.
data (DataFrame): A pandas DataFrame minimally containing the columns
'onset', 'duration', and 'amplitude'.
run_info (list): A list of RunInfo objects carrying information about
all runs represented in the Variable.
source (str): The type of BIDS variable file the data were extracted
from. Must be one of: 'events', 'physio', 'stim', 'regressors',
'scans', 'sessions', 'participants', or 'beh'.
kwargs: Optional keyword arguments passed onto superclass.
'''
_property_columns = {'onset', 'duration'}
[docs] def __init__(self, name, data, run_info, source, **kwargs):
if hasattr(run_info, 'duration'):
run_info = [run_info]
if not isinstance(run_info, list):
raise TypeError("We expect a list of run_info, got %s"
% repr(run_info))
self.run_info = run_info
for sc in self._property_columns:
setattr(self, sc, data.pop(sc).values)
super(SparseRunVariable, self).__init__(name, data, source, **kwargs)
[docs] def get_duration(self):
''' Return the total duration of the Variable's run(s). '''
return sum([r.duration for r in self.run_info])
[docs] def to_dense(self, sampling_rate):
''' Convert the current sparse column to a dense representation.
Returns: A DenseRunVariable.
Args:
sampling_rate (int, str): Sampling rate (in Hz) to use when
constructing the DenseRunVariable.
Returns:
A DenseRunVariable.
'''
duration = int(math.ceil(sampling_rate * self.get_duration()))
ts = np.zeros(duration, dtype=self.values.dtype)
onsets = np.round(self.onset * sampling_rate).astype(int)
durations = np.round(self.duration * sampling_rate).astype(int)
run_i, start, last_ind = 0, 0, 0
for i, val in enumerate(self.values.values):
if onsets[i] < last_ind:
start += self.run_info[run_i].duration * sampling_rate
run_i += 1
_onset = int(start + onsets[i])
_offset = int(_onset + durations[i])
ts[_onset:_offset] = val
last_ind = onsets[i]
run_info = list(self.run_info)
return DenseRunVariable(
name=self.name,
values=ts,
run_info=run_info,
source=self.source,
sampling_rate=sampling_rate)
@classmethod
def _merge(cls, variables, name, **kwargs):
run_info = list(chain(*[v.run_info for v in variables]))
return super(SparseRunVariable, cls)._merge(variables, name,
run_info=run_info,
**kwargs)
[docs]class DenseRunVariable(BIDSVariable):
''' A dense representation of a single column.
Parameters
----------
name : :obj:`str`
The name of the column.
values : :obj:`numpy.ndarray`
The values/amplitudes to store.
run_info : :obj:`list`
A list of RunInfo objects carrying information about all runs
represented in the Variable.
source : {'events', 'physio', 'stim', 'regressors', 'scans', 'sessions', 'participants', 'beh'}
The type of BIDS variable file the data were extracted from.
sampling_rate : :obj:`float`
Optional sampling rate (in Hz) to use. Must match the sampling rate used
to generate the values. If None, the collection's sampling rate will be used.
'''
[docs] def __init__(self, name, values, run_info, source, sampling_rate):
values = pd.DataFrame(values)
if hasattr(run_info, 'duration'):
run_info = [run_info]
self.run_info = run_info
self.sampling_rate = sampling_rate
self.index = self._build_entity_index(run_info, sampling_rate)
super(DenseRunVariable, self).__init__(name, values, source)
[docs] def split(self, grouper):
'''Split the current DenseRunVariable into multiple columns.
Parameters
----------
grouper : :obj:`pandas.DataFrame`
Binary DF specifying the design matrix to use for splitting. Number
of rows must match current ``DenseRunVariable``; a new ``DenseRunVariable``
will be generated for each column in the grouper.
Returns
-------
A list of DenseRunVariables, one per unique value in the grouper.
'''
values = grouper.values * self.values.values
df = pd.DataFrame(values, columns=grouper.columns)
return [DenseRunVariable(name='%s.%s' % (self.name, name),
values=df[name].values,
run_info=self.run_info,
source=self.source,
sampling_rate=self.sampling_rate)
for i, name in enumerate(df.columns)]
def _build_entity_index(self, run_info, sampling_rate):
''' Build the entity index from run information. '''
index = []
interval = int(round(1000. / sampling_rate))
_timestamps = []
for run in run_info:
reps = int(math.ceil(run.duration * sampling_rate))
ent_vals = list(run.entities.values())
df = pd.DataFrame([ent_vals] * reps, columns=list(run.entities.keys()))
ts = pd.date_range(0, periods=len(df), freq='%sms' % interval)
_timestamps.append(ts.to_series())
index.append(df)
self.timestamps = pd.concat(_timestamps, axis=0, sort=True)
return pd.concat(index, axis=0, sort=True).reset_index(drop=True)
[docs] def resample(self, sampling_rate, inplace=False, kind='linear'):
'''Resample the Variable to the specified sampling rate.
Parameters
----------
sampling_rate : :obj:`int`, :obj:`float`
Target sampling rate (in Hz).
inplace : :obj:`bool`, optional
If True, performs resampling in-place. If False, returns a resampled
copy of the current Variable. Default is False.
kind : {'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic'}
Argument to pass to :obj:`scipy.interpolate.interp1d`; indicates
the kind of interpolation approach to use. See interp1d docs for
valid values. Default is 'linear'.
'''
if not inplace:
var = self.clone()
var.resample(sampling_rate, True, kind)
return var
if sampling_rate == self.sampling_rate:
return
old_sr = self.sampling_rate
n = len(self.index)
self.index = self._build_entity_index(self.run_info, sampling_rate)
x = np.arange(n)
num = len(self.index)
from scipy.interpolate import interp1d
f = interp1d(x, self.values.values.ravel(), kind=kind)
x_new = np.linspace(0, n - 1, num=num)
self.values = pd.DataFrame(f(x_new))
assert len(self.values) == len(self.index)
self.sampling_rate = sampling_rate
[docs] def to_df(self, condition=True, entities=True, timing=True, sampling_rate=None):
'''Convert to a DataFrame, with columns for name and entities.
Parameters
----------
condition : :obj:`bool`
If True, adds a column for condition name, and names the amplitude
column 'amplitude'. If False, returns just onset, duration, and
amplitude, and gives the amplitude column the current column name.
entities : :obj:`bool`
If True, adds extra columns for all entities.
timing : :obj:`bool`
If True, includes onset and duration columns (even though events are
sampled uniformly). If False, omits them.
'''
if sampling_rate not in (None, self.sampling_rate):
return self.resample(sampling_rate).to_df(condition, entities)
df = super(DenseRunVariable, self).to_df(condition, entities)
if timing:
df['onset'] = self.timestamps.values.astype(float) / 1e+9
df['duration'] = 1. / self.sampling_rate
return df
@classmethod
def _merge(cls, variables, name, sampling_rate=None, **kwargs):
if not isinstance(sampling_rate, int):
rates = set([v.sampling_rate for v in variables])
if len(rates) == 1:
sampling_rate = list(rates)[0]
else:
if sampling_rate == 'auto':
sampling_rate = max(rates)
else:
msg = ("Cannot merge DenseRunVariables (%s) with different"
" sampling rates (%s). Either specify an integer "
"sampling rate to use for all variables, or set "
"sampling_rate='auto' to use the highest sampling "
"rate found." % (name, rates))
raise ValueError(msg)
variables = [v.resample(sampling_rate) for v in variables]
values = pd.concat([v.values for v in variables], axis=0, sort=True)
run_info = list(chain(*[v.run_info for v in variables]))
source = variables[0].source
return DenseRunVariable(
name=name,
values=values,
run_info=run_info,
source=source,
sampling_rate=sampling_rate)
[docs]def merge_variables(variables, name=None, **kwargs):
'''Merge/concatenate a list of variables along the row axis.
Parameters
----------
variables : :obj:`list`
A list of Variables to merge.
name : :obj:`str`
Optional name to assign to the output Variable. By default, uses the
same name as the input variables.
kwargs
Optional keyword arguments to pass onto the class-specific merge() call.
Possible args:
- sampling_rate (int, str): The sampling rate to use if resampling
of DenseRunVariables is necessary for harmonization. If 'auto',
the highest sampling rate found will be used. This argument is
only used when passing DenseRunVariables in the variables list.
Returns
-------
A single BIDSVariable of the same class as the input variables.
Notes
-----
- Currently, this function only support homogenously-typed lists. In
future, it may be extended to support implicit conversion.
- Variables in the list must all share the same name (i.e., it is not
possible to merge two different variables into a single variable.)
'''
classes = set([v.__class__ for v in variables])
if len(classes) > 1:
raise ValueError("Variables of different classes cannot be merged. "
"Variables passed are of classes: %s" % classes)
sources = set([v.source for v in variables])
if len(sources) > 1:
raise ValueError("Variables extracted from different types of files "
"cannot be merged. Sources found: %s" % sources)
return list(classes)[0].merge(variables, **kwargs)