# -*- coding: utf-8 -*-
"""
This module implements NSpatial Class for NeuroChaT software.
@author: Md Nurul Islam; islammn at tcd dot ie
"""
import os
import re
import logging
from collections import OrderedDict as oDict
from copy import deepcopy
from neurochat.nc_utils import chop_edges, corr_coeff, extrema,\
find, find2d, find_chunk, histogram, histogram2d, \
linfit, residual_stat, rot_2d, smooth_1d, smooth_2d, \
centre_of_mass, find_true_ranges, RecPos
from neurochat.nc_base import NAbstract
from neurochat.nc_circular import CircStat
from neurochat.nc_hdf import Nhdf
from neurochat.nc_spike import NSpike
from neurochat.nc_lfp import NLfp
from neurochat.nc_event import NEvent
import numpy as np
import numpy.random as nprand
import scipy as sc
from scipy.optimize import curve_fit
import scipy.signal as sg
from sklearn.linear_model import LinearRegression
[docs]class NSpatial(NAbstract):
"""
This data class contains the spatial behaviour of the animal.
It decodes data from different formats and analyses the correlation
of spatial information with the spiking activity of a unit.
Attributes
----------
_time : np.ndarray
The times of each spatial sample.
_fs : float
The sampling frequency of the spatial data.
_pos_x : np.ndarray
The x position of each spatial sample.
_pos_y : np.ndarray
The y position of each spatial sample.
_direction : np.ndarray
The head direction of the animal.
_speed : np.ndarray
The speed of the animal.
_ang_val : np.ndarray
The angular head velocity of the animal.
_border_dist : np.ndarray
The distance to the border of the animal.
"""
def __init__(self, **kwargs):
"""See the class description."""
super().__init__(**kwargs)
self._time = np.array([])
self._timestamp = []
self._total_samples = []
self._fs = 0
self._pixel_size = 3
self._pos_x = []
self._pos_y = []
self._direction = np.array([])
self._speed = []
self._ang_vel = np.array([])
self._border_dist = []
self._xbound = []
self._ybound = []
self._dist_map = []
self.spike = []
self.lfp = []
self.__type = 'spatial'
[docs] def get_type(self):
"""
Return the type of object.
For NSpatial, this is always `spatial` type.
Parameters
----------
None
Returns
-------
str
"""
return self.__type
[docs] def subsample(self, sample_range=None):
"""
Extract a time range from the positions.
NOTE
----
Currently, the duration will be longer than sample time.
Duration is actually from 0 to max recording length.
This is to easier match ndata, which assumes recordings start at 0.
Parameters
----------
sample_range : tuple
the time in seconds to extract from the positions.
Returns
-------
NSpike
subsampled version of initial spatial object
"""
if sample_range is None:
return self
new_spatial = deepcopy(self)
lower, upper = sample_range
times = self._time
sample_spatial_idx = (
(times <= upper) & (times >= lower)).nonzero()
new_spatial._set_time(self._time[sample_spatial_idx])
new_spatial._set_pos_x(self._pos_x[sample_spatial_idx])
new_spatial._set_pos_y(self._pos_y[sample_spatial_idx])
new_spatial._set_direction(self._direction[sample_spatial_idx])
new_spatial._set_speed(self._speed[sample_spatial_idx])
new_spatial.set_ang_vel(self._ang_vel[sample_spatial_idx])
# NOTE can use to set proper duration
# new_spatial._set_duration(upper-lower)
return new_spatial
[docs] def set_pixel_size(self, pixel_size):
"""
Set the size of pixel size.
This is the value by which the entire foraged arena is tessellated.
Note
----
The majority of the analyses in this class instead take in this
value per function, as opposed to using this class value.
Parameters
----------
pixel_size : int
Pixel size of the foraged arena
Returns
-------
None
"""
self._pixel_size = pixel_size
def _set_time(self, time):
"""
Set the time for all the spatial samples.
It also resets the `timestamp` or the the temporal resolution
of the spatial samples and recalculates the sampling rate.
Parameters
----------
time : list or ndarray
Timestamps of all spatial samples
Returns
-------
None
"""
self._time = time
self._set_timestamp()
self._set_sampling_rate()
def _set_timestamp(self, timestamp=None):
"""
Set the timestamps for the spatial information.
Here, it is defined as the temporal resolution of the spatial samples,
not the happening time of each sample.
This way it is different from NLfp and NSpike timestamp definition.
Parameters
----------
timestamp : list or ndarray
Timestamps of all spiking waveforms
Returns
-------
None
"""
if timestamp:
self._timestamp = timestamp
elif np.array(self._time).any():
self._timestamp = np.diff(np.array(self._time)).mean()
def _set_sampling_rate(self, sampling_rate=None):
"""
Set the sampling rate of the spatial information.
Parameters
----------
sampling_rate : int
Sampling rate of the spatial information
Returns
-------
None
"""
if sampling_rate:
self._fs = sampling_rate
elif np.array(self._time).any():
self._fs = 1 / np.diff(np.array(self._time)).mean()
def _set_pos_x(self, pos_x):
"""
Set the X-coordinate of the location of the animal.
Parameters
----------
pos_x : ndarray
X-coordinate of the location of the animal
Returns
-------
None
"""
self._pos_x = pos_x
def _set_pos_y(self, pos_y):
"""
Set the Y-coordinate of the location of the animal.
Parameters
----------
pos_y : ndarray
Y-coordinate of the location of the animal
Returns
-------
None
"""
self._pos_y = pos_y
def _set_direction(self, direction):
"""
Set the head-direction of the animal.
Parameters
----------
direction : ndarray
Head-direction of the animal
Returns
-------
None
"""
self._direction = direction
def _set_speed(self, speed):
"""
Set the speed of the animal.
Parameters
----------
speed : ndarray
Speed of the animal
Returns
-------
None
"""
self._speed = speed
[docs] def set_ang_vel(self, ang_vel):
"""
Set the angular head velocity (AHV) of the animal.
Parameters
----------
ang_vel : ndarray
Angular head velocity (AHV) of the animal
Returns
-------
None
"""
self._ang_vel = ang_vel
[docs] def set_border(self, border):
"""
Set the distance of the animal from the arena border.
Parameters
----------
border : ndarray
Distance of the animal from the arena border
Returns
-------
None
"""
self._border_dist = border[0]
self._xbound = border[1]
self._ybound = border[2]
self._dist_map = border[3]
[docs] def get_total_samples(self):
"""
Return the number of spatial samples.
Parameters
----------
None
Returns
-------
int
Total spatial samples
"""
return self._time.size
[docs] def get_sampling_rate(self):
"""
Return the sampling rate of the spatial samples.
Parameters
----------
None
Returns
-------
int
Spatial data sampling rate
"""
return self._fs
[docs] def get_duration(self):
"""
Return the duration of the experiment.
Parameters
----------
None
Returns
-------
float
Duration of the experiment
"""
if len(self._time) == 0:
return 0
return self._time[-1]
[docs] def get_pixel_size(self):
"""
Return the pixel size of the recorded arena.
Parameters
----------
None
Returns
-------
int
Pixel size
"""
return self._pixel_size
[docs] def get_time(self):
"""
Return the time of individual spatial samples.
Parameters
----------
None
Returns
-------
int
Total spatial samples
"""
return self._time
[docs] def get_timestamp(self):
"""
Return the temporal resolution of spatial samples.
Parameters
----------
None
Returns
-------
int
Temporal resolution of spatial samples
"""
return self._timestamp
[docs] def get_pos_x(self):
"""
Return the X-coordinates of animal's location.
Parameters
----------
None
Returns
-------
ndarray
X-coordinates of animal's location
"""
return self._pos_x
[docs] def get_pos_y(self):
"""
Return the Y-coordinates of animal's location.
Parameters
----------
None
Returns
-------
ndarray
Y-coordinates of animal's location
"""
return self._pos_y
[docs] def get_direction(self):
"""
Return head direction of the animal.
Parameters
----------
None
Returns
-------
ndarray
Head direction of the animal
"""
return self._direction
[docs] def get_speed(self):
"""
Return speed of the animal.
Parameters
----------
None
Returns
-------
ndarray
Speed of the animal
"""
return self._speed
[docs] def get_ang_vel(self):
"""
Return angular head velocity of the animal.
Parameters
----------
None
Returns
-------
ndarray
Angular head velocity of the animal
"""
return self._ang_vel
[docs] def get_border(self):
"""
Return animal's distance from the border.
Parameters
----------
None
Returns
-------
ndarray
Animal's distance from the border
"""
return self._border_dist, self._xbound, self._ybound, self._dist_map
[docs] def set_spike(self, spike, **kwargs):
"""
Add the NSpike object to NSpatial object.
Parameters
----------
spike : NSpike
NSpike object to be added to the NSpatial object. If no spike object
is provided, a new NSpike() object is created.
**kwargs
Keyword argumemts for creating the new NSpike instance
Returns
-------
None
"""
if spike is isinstance(spike, NSpike):
self.spike = spike
else:
cls = NSpike if not spike else spike
spike = cls(**kwargs)
self.spike = spike
[docs] def set_lfp(self, lfp, **kwargs):
"""
Add the NLfp object to NSpatial object.
Parameters
----------
lfp : NLfp
NLfp object to be added to the NSpatial object. If no spike object
is provided, a new NLfp() object is created.
**kwargs
Keyword argumemts for creating the new NLfp instance
Returns
-------
None
"""
if lfp is isinstance(lfp, NLfp):
self.lfp = lfp
else:
cls = NLfp if not lfp else lfp
lfp = cls(**kwargs)
self.lfp = lfp
[docs] def set_spike_name(self, name=None):
"""
Set the name of the spike dataset.
Parameters
----------
name : str
Name of the spike dataset
Returns
-------
None
"""
if name is not None:
self.spike.set_name(name)
[docs] def set_spike_filename(self, filename=None):
"""
Set file name of the spike dataset.
Parameters
----------
name : str
Full file directory of the spike dataset
Returns
-------
None
"""
if filename is not None:
self.spike.set_filename()
[docs] def set_lfp_name(self, name=None):
"""
Set the name of the lfp dataset.
Parameters
----------
name : str
Name of the lfp dataset
Returns
-------
None
"""
self.lfp.set_name(name)
[docs] def set_lfp_filename(self, filename=None):
"""
Set file name of the lfp dataset.
Parameters
----------
name : str
Full file directory of the lfp dataset
Returns
-------
None
"""
self.lfp.set_filename(filename)
[docs] def set_event(self, event, **kwargs):
"""
Set the NEvent() object to NSpatial().
Parameters
----------
event
NEvent or its childclass or NEvent() object
Returns
-------
NEvent()
"""
if event is isinstance(event, NEvent):
self.event = event
else:
cls = NEvent if not event else event
event = cls(**kwargs)
self.event = event
[docs] def set_event_name(self, name=None):
"""
Set the name of the event object.
Parameters
----------
name : str
Name of the vent dataset
Returns
-------
None
"""
self.event.set_name(name)
[docs] def set_event_filename(self, filename=None):
"""
Set the filename for the event.
Parameters
----------
filename : str
Full file of the event dataset
Returns
-------
None
"""
self.event.set_filename(filename)
[docs] def set_system(self, system=None):
"""
Set the data format or recording system.
Parameters
----------
system : str
Data format or recording system
Returns
-------
None
"""
if system is not None:
self._system = system
if self.spike:
self.spike.set_system(system)
if self.lfp:
self.lfp.set_system(system)
[docs] def load_spike(self):
"""
Load the composing spike object.
Parameters
----------
None
Returns
-------
None
"""
self.spike.load()
[docs] def load_lfp(self):
"""
Load the composite lfp object.
Parameters
----------
None
Returns
-------
None
"""
self.lfp.load()
[docs] def save_to_hdf5(self, file_name=None, system=None):
"""
Save spatial dataset to HDF5 file.
Parameters
----------
None
Returns
-------
None
"""
hdf = Nhdf()
if file_name and system:
if os.path.exists(file_name):
self.set_filename(file_name)
self.set_system(system)
self.load()
else:
logging.error('Specified file cannot be found!')
hdf.save_spatial(spatial=self)
hdf.close()
[docs] def load(self, filename=None, system=None):
"""
Load the spatial object.
Parameters
----------
None
Returns
-------
None
"""
if system is None:
system = self._system
else:
self._system = system
if filename is None:
filename = self._filename
else:
self._filename = filename
loader = getattr(self, 'load_spatial_' + system)
loader(filename)
try:
self.smooth_speed()
except BaseException:
logging.warning(self.get_system() +
' files may not have speed data!')
self.set_ang_vel(self.calc_ang_vel())
if not (np.all(self._pos_x == 0) and np.all(self._pos_y == 0)):
self.set_border(self.calc_border())
[docs] def load_spatial_Axona(self, file_name):
"""
Load Axona format spatial data to the NSpatial() object.
Parameters
----------
None
Returns
-------
None
"""
try:
self._set_data_source(file_name)
self._set_source_format('Axona')
if file_name.endswith(".txt"):
f = open(file_name, 'rt')
while True:
line = f.readline()
if line == '':
break
elif line.startswith('time'):
spatial_data = np.loadtxt(
f, dtype='float', usecols=range(5))
self._set_time(spatial_data[:, 0])
self._set_pos_x(spatial_data[:, 1] - np.min(spatial_data[:, 1]))
self._set_pos_y(spatial_data[:, 2] - np.min(spatial_data[:, 2]))
self._set_direction(spatial_data[:, 3])
self._set_speed(spatial_data[:, 4])
f.seek(0, 0)
pixel_size = list(
map(float, re.findall(r"\d+.\d+|\d+", f.readline())))
self.set_pixel_size(pixel_size)
self.smooth_direction()
f.close()
elif file_name.endswith(".pos"):
rc = RecPos(file_name=file_name, load=True)
time = np.array([0.02 * i for i in range(rc.total_samples)])
self._set_time(time)
x, y = rc.get_position()
self._set_pos_x(x - np.min(x))
self._set_pos_y(y - np.min(y))
self._set_direction(rc.get_angular_pos())
self._set_speed(rc.get_speed())
self.set_pixel_size(rc.pixels_per_cm)
self.smooth_direction()
else:
logging.error("Position data only supports .pos or .txt")
return
except BaseException:
logging.error('File does not exist or is open in another process!')
[docs] def load_spatial_NWB(self, file_name):
"""
Load HDF5 format spatial data to the NSpatial() object.
Parameters
----------
None
Returns
-------
None
"""
file_name, path = file_name.split('+')
if os.path.exists(file_name):
hdf = Nhdf()
hdf.set_filename(file_name)
_record_info = {}
if path in hdf.f:
g = hdf.f[path]
elif '/processing/Behavioural/Position' in hdf.f:
path = '/processing/Behavioural/Position'
g = hdf.f[path]
logging.info('Path for spatial data set to: ' + path)
else:
logging.error('Path for spatial data does not exist!')
for key, value in g.attrs.items():
_record_info[key] = value
self.set_record_info(_record_info)
if path + '/' + 'location' in g:
g_loc = g[path + '/' + 'location']
data = hdf.get_dataset(group=g_loc, name='data')
self._set_pos_x(data[:, 0])
self._set_pos_y(data[:, 1])
self._set_time(hdf.get_dataset(group=g_loc, name='timestamps'))
else:
logging.error('Spatial location information not found!')
if path + '/' + 'direction' in g:
g_dir = g[path + '/' + 'direction']
data = hdf.get_dataset(group=g_dir, name='data')
self._set_direction(data)
else:
logging.error('Spatial direction information not found!')
if path + '/' + 'speed' in g:
g_speed = g[path + '/' + 'speed']
data = hdf.get_dataset(group=g_speed, name='data')
self._set_speed(data)
else:
logging.error('Spatial speed information not found!')
if path + '/' + 'angular velocity' in g:
g_ang_vel = g[path + '/' + 'angular velocity']
data = hdf.get_dataset(group=g_ang_vel, name='data')
self.set_ang_vel(data)
else:
self.set_ang_vel(np.array([]))
logging.warning(
'Spatial angular velocity information not found, will be calculated from direction!')
hdf.close()
[docs] def load_spatial_Neuralynx(self, file_name):
"""
Load Neuralynx format spatial data to the NSpatial() object.
Parameters
----------
None
Returns
-------
None
"""
self._set_data_source(file_name)
self._set_source_format('Neuralynx')
# Format description for the NLX file:
header_offset = 16 * 1024 # fixed for NLX files
bytes_start_record = 2
bytes_origin_id = 2
bytes_videoRec_size = 2
bytes_per_timestamp = 8
bytes_per_bitfield = 4 * 400
bytes_sncrc = 2
bytes_per_xloc = 4
bytes_per_yloc = 4
bytes_per_angle = 4
bytes_per_target = 4 * 50
record_size = None
with open(file_name, 'rb') as f:
while True:
line = f.readline()
try:
line = line.decode('UTF-8')
except BaseException:
break
if line == '':
break
if 'SamplingFrequency' in line:
self._set_sampling_rate(
float(''.join(re.findall(r'\d+.\d+|\d+', line))))
if 'RecordSize' in line:
record_size = int(
''.join(re.findall(r'\d+.\d+|\d+', line)))
if 'Time Opened' in line:
self._set_date(re.search(r'\d+/\d+/\d+', line).group())
if 'FileVersion' in line:
self._set_file_version(line.split()[1])
if not record_size:
record_size = bytes_start_record + \
bytes_origin_id + \
bytes_videoRec_size + \
bytes_per_timestamp + \
bytes_per_bitfield + \
bytes_sncrc + \
bytes_per_xloc + \
bytes_per_yloc + \
bytes_per_angle + \
bytes_per_target
time_offset = bytes_start_record + \
bytes_origin_id + \
bytes_videoRec_size
xloc_offset = time_offset + \
bytes_per_timestamp + \
bytes_per_bitfield + \
bytes_sncrc
yloc_offset = xloc_offset + bytes_per_xloc
angle_offset = yloc_offset + bytes_per_xloc
f.seek(0, 2)
self._total_samples = int((f.tell() - header_offset) / record_size)
spatial_data = np.zeros([self._total_samples, 4])
f.seek(header_offset)
for i in np.arange(self._total_samples):
sample_bytes = np.fromfile(f, dtype='uint8', count=record_size)
spatial_data[i, 0] = int.from_bytes(
sample_bytes[time_offset + np.arange(bytes_per_timestamp)],
byteorder='little', signed=False)
spatial_data[i, 1] = int.from_bytes(
sample_bytes[xloc_offset + np.arange(bytes_per_xloc)],
byteorder='little', signed=False)
spatial_data[i, 2] = int.from_bytes(
sample_bytes[yloc_offset + np.arange(bytes_per_yloc)],
byteorder='little', signed=False)
spatial_data[i, 3] = int.from_bytes(
sample_bytes[angle_offset + np.arange(bytes_per_angle)],
byteorder='little', signed=False)
spatial_data[:, 0] /= 10**6
spatial_data[:, 0] -= np.min(spatial_data[:, 0])
self._timestamp = np.mean(np.diff(spatial_data[:, 0]))
self._set_sampling_rate(1 / self._timestamp)
self._set_time(spatial_data[:, 0])
self._set_pos_x(spatial_data[:, 1] - np.min(spatial_data[:, 1]))
self._set_pos_y(spatial_data[:, 2] - np.min(spatial_data[:, 2]))
self._set_direction(spatial_data[:, 3])
# Neuralynx data does not have any speed information
self.smooth_direction()
[docs] def smooth_speed(self):
"""
Smooth the speed data using a moving-average box filter.
Parameters
----------
None
Returns
-------
None
"""
self._set_speed(smooth_1d(self.get_speed(), 'b', 5))
[docs] def smooth_direction(self):
"""
Smooth the angular head direction using a moving circular average.
Parameters
----------
None
Returns
-------
None
See also
--------
nc_circular.CircStat().circ_smooth()
"""
cs = CircStat()
cs.set_theta(self.get_direction())
self._set_direction(cs.circ_smooth(filttype='b', filtsize=5))
[docs] def calc_ang_vel(self, npoint=5):
"""
Calculate the angular head velocity of the animal from the direction.
Each sample is the slope of a fitted line of five directional data
centred around current sample.
Parameters
----------
None
Returns
-------
None
"""
theta = self.get_direction()
if theta.size == 0:
logging.warning(
"No head direction data available for angular velocity")
return np.nan
ang_vel = np.zeros(theta.shape)
N = theta.size
L = npoint
l = int(np.floor(L / 2))
cs = CircStat()
for i in np.arange(l):
y = cs.circ_regroup(theta[:L - l + i])
ang_vel[i] = np.polyfit(np.arange(len(y)), y, 1)[0]
for i in np.arange(l, N - l, 1):
y = cs.circ_regroup(theta[i - l:i + l + 1])
ang_vel[i] = np.polyfit(np.arange(len(y)), y, 1)[0]
for i in np.arange(N - l, N):
y = cs.circ_regroup(theta[i - l:])
ang_vel[i] = np.polyfit(np.arange(len(y)), y, 1)[0]
return ang_vel * self.get_sampling_rate()
[docs] def calc_border(self, **kwargs):
"""
Identify the border of the recording arena.
This is inferred from the trace of the foraging of the animal in the arena.
Parameters
----------
**kwargs
Keyword arguments
Returns
-------
border_dist : ndarray
Distance of the animal from the border at each behavioural samples
xedges : ndarray
Pixelated edge of the x-axis
yedges : ndarray
Pixelated edge of the y-axis
dist_mat : ndarray
A matrix of distance of each pixel of the arena from the identified
border
"""
pixel = kwargs.get('pixel', 3)
chop_bound = kwargs.get('chop_bound', 5)
if len(self._pos_x) == 0:
raise ValueError(
"No positional data to calculate border from")
xedges = np.arange(0, np.ceil(np.max(self._pos_x)), pixel)
yedges = np.arange(0, np.ceil(np.max(self._pos_y)), pixel)
tmap, yedges, xedges = histogram2d(
self._pos_y, self._pos_x, yedges, xedges)
if abs(xedges.size - yedges.size) <= chop_bound:
tmap = chop_edges(tmap, min(tmap.shape), min(tmap.shape))[2]
else:
tmap = chop_edges(tmap, tmap.shape[1], tmap.shape[0])[2]
ybin, xbin = tmap.shape
if ybin == 1 or xbin == 1:
logging.error("could not calculate border as subject did not move")
return None, None, None, None
border = np.zeros(tmap.shape)
border[tmap > 0] = False
border[tmap == 0] = True
for J in np.arange(ybin):
for I in np.arange(xbin):
if not border[J, I] and (
J == ybin - 1 or J == 0 or I == xbin - 1 or I == 0):
border[J, I] = True
# Optimize the border
optBorder = np.zeros(border.shape)
for i in np.arange(border.shape[0]):
for j in np.arange(border.shape[1]):
if border[i, j]:
if i == 0: # along the 1st row
if border[i, j] != border[i + 1, j]:
optBorder[i, j] = True
elif j == 0: # along the 1st column
if border[i, j] != border[i, j + 1]:
optBorder[i, j] = True
elif i == border.shape[0] - 1: # along the last row
if border[i, j] != border[i - 1, j]:
optBorder[i, j] = True
elif j == border.shape[1] - 1: # along the last column
if border[i, j] != border[i, j - 1]:
optBorder[i, j] = True
else: # other cases
if (border[i, j] != border[i, j + 1]) or (border[i, j] != border[i + 1, j])\
or (border[i, j] != border[i, j - 1]) or (border[i, j] != border[i - 1, j]):
optBorder[i, j] = True
border = optBorder
xborder = np.zeros(tmap.shape, dtype=bool)
yborder = np.zeros(tmap.shape, dtype=bool)
for J in np.arange(ybin):
# 1 added/subed to the next pixel of the traversed arena as the
# border
xborder[J, find(border[J, :], 1, 'first')] = True
xborder[J, find(border[J, :], 1, 'last')] = True
for I in np.arange(xbin):
yborder[find(border[:, I], 1, 'first'), I] = True
yborder[find(border[:, I], 1, 'last'), I] = True
# self.border = border
border = xborder | yborder
self.tmap = tmap * self._timestamp
distMat = np.zeros(border.shape)
xx, yy = np.meshgrid(np.arange(xbin), np.arange(ybin))
borderDist = np.zeros(self._time.size)
xedges = np.arange(xbin) * pixel
yedges = np.arange(ybin) * pixel
xind = histogram(self._pos_x, xedges)[1]
yind = histogram(self._pos_y, yedges)[1]
for J in np.arange(ybin):
for I in np.arange(xbin):
dist_arr = (
np.abs(xx[border] - xx[J, I]) +
np.abs(yy[border] - yy[J, I]))
if dist_arr.size == 0:
logging.error("could not calculate border")
return None, None, None, None
tmp_dist = np.min(dist_arr)
if find(np.logical_and(xind == I, yind == J)).size:
borderDist[np.logical_and(xind == I, yind == J)] = tmp_dist
distMat[J, I] = tmp_dist
dist_mat = distMat * pixel
border_dist = borderDist * pixel
return border_dist, xedges, yedges, dist_mat
[docs] @staticmethod
def skaggs_info(firing_rate, visit_time):
"""
Calculate the Skaggs information content of the spatial firing.
Parameters
----------
firing_rate : ndarray
Firing rate of the unit at each pixelated location or binned information,
i.e., binned speed or head-direction
visit_time : ndarray
Amount of time animal spent in each pixel or bin
Returns
-------
float
Skaggs information content
"""
firing_rate[np.isnan(firing_rate)] = 0
Li = firing_rate # Lambda
L = np.sum(firing_rate * visit_time) / visit_time.sum()
P = visit_time / visit_time.sum()
return np.sum(
P[Li > 0] * (Li[Li > 0] / L) * np.log2(Li[Li > 0] / L))
[docs] @staticmethod
def spatial_sparsity(firing_rate, visit_time):
"""
Calculate the spatial sparsity of the spatial firing.
Parameters
----------
firing_rate : ndarray
Firing rate of the unit at each pixelated location
visit_time : ndarray
Amount of time animal spent in each pixel
Returns
-------
float
Spatial sparsity
"""
firing_rate[np.isnan(firing_rate)] = 0
Li = firing_rate # Lambda
# L = np.sum(firing_rate*visit_time)/ visit_time.sum()
P = visit_time / visit_time.sum()
return np.sum(P * Li)**2 / np.sum(P * Li**2)
[docs] def speed(self, ftimes, **kwargs):
"""
Calculate the firing rate of the unit at different binned speeds.
The spike rate vs speed is fitted with a linear equation and goodness of fit
is measured
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
# When update = True, it will use the
update = kwargs.get('update', True)
# results for statistics, if False,
# i.e. in Multiple Regression, it will ignore updating
binsize = kwargs.get('binsize', 1)
min_speed, max_speed = kwargs.get('range', [0, 40])
speed_ = self.get_speed()
max_speed = min(max_speed, np.ceil(speed_.max() / binsize) * binsize)
min_speed = max(min_speed, np.floor(speed_.min() / binsize) * binsize)
bins = np.arange(min_speed, max_speed, binsize)
vid_count = histogram(ftimes, self.get_time())[0]
visit_time, speedInd = histogram(speed_, bins)[0:2]
visit_time = visit_time / self.get_sampling_rate()
count = np.array(
[sum(vid_count[speedInd == i]) for i in range(len(bins))]
).astype(np.float64)
rate = np.divide(
count, visit_time, out=np.zeros_like(count), where=(visit_time != 0))
_results['Speed Skaggs'] = self.skaggs_info(rate, visit_time)
rate = rate[visit_time > 1]
bins = bins[visit_time > 1]
fit_result = linfit(bins, rate)
_results['Speed Pears R'] = fit_result['Pearson R']
_results['Speed Pears P'] = fit_result['Pearson P']
_results['Mean Speed'] = np.mean(speed_)
graph_data['bins'] = bins
graph_data['rate'] = rate
graph_data['fitRate'] = fit_result['yfit']
if update:
self.update_result(_results)
return graph_data
[docs] def angular_velocity(self, ftimes, **kwargs):
"""
Calculate the firing rate of the unit at binned angular head velocity.
The spike rate vs speed is fitted with a linear equation individually
for the negative and positive angular velocities, and goodness of fit
is measured
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
# When update = True, it will use the
update = kwargs.get('update', True)
# results for statistics, if False,
# i.e. in Multiple Regression, it will ignore updating
binsize = kwargs.get('binsize', 10)
min_vel, max_vel = kwargs.get('range', [-100, 100])
cutoff = kwargs.get('cutoff', 10)
ang_vel = self.get_ang_vel()
max_vel = min(max_vel, np.ceil(ang_vel.max() / binsize) * binsize)
min_vel = max(min_vel, np.floor(ang_vel.min() / binsize) * binsize)
bins = np.arange(min_vel, max_vel, binsize)
vid_count = histogram(ftimes, self.get_time())[0]
visit_time, velInd = histogram(ang_vel, bins)[0:2]
visit_time = visit_time / self.get_sampling_rate()
rate = np.array([sum(vid_count[velInd == i])
for i in range(len(bins))]) / visit_time
rate[np.isnan(rate)] = 0
_results['speedSkaggs'] = self.skaggs_info(rate, visit_time)
rate = rate[visit_time > 1]
bins = bins[visit_time > 1]
fit_result = linfit(bins[bins <= -cutoff], rate[bins <= -cutoff])
_results['Ang Vel Left Pears R'] = fit_result['Pearson R']
_results['Ang Vel Left Pears P'] = fit_result['Pearson P']
graph_data['leftBins'] = bins[bins <= -cutoff]
graph_data['leftRate'] = rate[bins <= -cutoff]
graph_data['leftFitRate'] = fit_result['yfit']
fit_result = linfit(bins[bins >= cutoff], rate[bins >= cutoff])
_results['Ang Vel Right Pears R'] = fit_result['Pearson R']
_results['Ang Vel Right Pears P'] = fit_result['Pearson P']
graph_data['rightBins'] = bins[bins >= cutoff]
graph_data['rightRate'] = rate[bins >= cutoff]
graph_data['rightFitRate'] = fit_result['yfit']
if update:
self.update_result(_results)
return graph_data
[docs] def place(self, ftimes, **kwargs):
"""
Calculate location based measures, used in Firing map.
This is used to get the two-dimensional firing rate of the unit
with respect to the location of the animal in the environment.
This is commonly called the Firing map.
Specificity indices are measured to assess the quality of
location-specific firing of the unit.
This method is also used to get the events of spike occurring superimposed
on the trace of the animal in the arena, commonly known as Spike Plot.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
update = kwargs.get('update', True)
pixel = kwargs.get('pixel', 3)
chop_bound = kwargs.get('chop_bound', 5)
filttype, filtsize = kwargs.get('filter', ['b', 5])
lim = kwargs.get('range', [0, self.get_duration()])
brAdjust = kwargs.get('brAdjust', True)
thresh = kwargs.get('fieldThresh', 0.2)
required_neighbours = kwargs.get('minPlaceFieldNeighbours', 9)
smooth_place = kwargs.get('smoothPlace', False)
# Can pass another NData object to estimate the border from
# Can be useful in some cases, such as when the animal
# only explores a subset of the arena.
separate_border_data = kwargs.get(
"separateBorderData", None)
# xedges = np.arange(0, np.ceil(np.max(self._pos_x)), pixel)
# yedges = np.arange(0, np.ceil(np.max(self._pos_y)), pixel)
# Update the border to match the requested pixel size
if separate_border_data is not None:
self.set_border(
separate_border_data.calc_border(**kwargs))
times = self._time
lower, upper = (times.min(), times.max())
new_times = separate_border_data._time
sample_spatial_idx = (
(new_times <= upper) & (new_times >= lower)).nonzero()
self._border_dist = self._border_dist[sample_spatial_idx]
else:
self.set_border(self.calc_border(**kwargs))
xedges = self._xbound
yedges = self._ybound
spikeLoc = self.get_event_loc(ftimes, **kwargs)[1]
posX = self._pos_x[np.logical_and(
self.get_time() >= lim[0], self.get_time() <= lim[1])]
posY = self._pos_y[np.logical_and(
self.get_time() >= lim[0], self.get_time() <= lim[1])]
tmap, yedges, xedges = histogram2d(posY, posX, yedges, xedges)
if tmap.shape[0] != tmap.shape[1] & np.abs(
tmap.shape[0] - tmap.shape[1]) <= chop_bound:
tmap = chop_edges(tmap, min(tmap.shape), min(tmap.shape))[2]
tmap /= self.get_sampling_rate()
ybin, xbin = tmap.shape
xedges = np.arange(xbin) * pixel
yedges = np.arange(ybin) * pixel
spike_count = histogram2d(spikeLoc[1], spikeLoc[0], yedges, xedges)[0]
fmap = np.divide(spike_count, tmap, out=np.zeros_like(
spike_count), where=tmap != 0)
if brAdjust:
nfmap = fmap / fmap.max()
if np.sum(np.logical_and(nfmap >= 0.2, tmap != 0)
) >= 0.8 * nfmap[tmap != 0].flatten().shape[0]:
back_rate = np.mean(
fmap[np.logical_and(nfmap >= 0.2, nfmap < 0.4)])
fmap -= back_rate
fmap[fmap < 0] = 0
if filttype is not None:
smoothMap = smooth_2d(fmap, filttype, filtsize)
else:
smoothMap = fmap
if smooth_place:
pmap = smoothMap
else:
pmap = fmap
pmap[tmap == 0] = None
pfield, largest_group = NSpatial.place_field(
pmap, thresh, required_neighbours)
# if largest_group == 0:
# if smooth_place:
# info = "where the place field was calculated from smoothed data"
# else:
# info = "where the place field was calculated from raw data"
# logging.info(
# "Lack of high firing neighbours to identify place field " +
# info)
centroid = NSpatial.place_field_centroid(pfield, pmap, largest_group)
# centroid is currently in co-ordinates, convert to pixels
centroid = centroid * pixel + (pixel * 0.5)
# flip x and y
centroid = centroid[::-1]
maxes = [xedges.max(), yedges.max()]
scales = (pixel, pixel)
co_ords = np.array(np.where(pfield == largest_group))
boundary = [[None, None], [None, None]]
for i in range(2):
j = (i + 1) % 2
boundary[i] = (
co_ords[j].min() * scales[i],
np.clip((co_ords[j].max() + 1) * scales[i], 0, maxes[i]))
spike_counts_place_field = spike_count[
co_ords[0], co_ords[1]]
number_of_spikes_in_place_field = np.sum(spike_counts_place_field)
max_firing_rate_in_place_field = pmap[
co_ords[0], co_ords[1]].max()
# inside_x = (
# (boundary[0][0] <= spikeLoc[0]) &
# (spikeLoc[0] <= boundary[0][1]))
# inside_y = (
# (boundary[1][0] <= spikeLoc[1]) &
# (spikeLoc[1] <= boundary[1][1]))
# co_ords = np.nonzero(np.logical_and(inside_x, inside_y))
if update:
_results['Spatial Skaggs'] = self.skaggs_info(fmap, tmap)
_results['Spatial Sparsity'] = self.spatial_sparsity(fmap, tmap)
_results['Spatial Coherence'] = np.corrcoef(
fmap[tmap != 0].flatten(), smoothMap[tmap != 0].flatten())[0, 1]
_results['Peak Firing Rate'] = fmap.max()
_results['Found strong place field'] = (largest_group != 0)
_results['Place field Centroid x'] = centroid[0]
_results['Place field Centroid y'] = centroid[1]
_results['Place field Boundary x'] = boundary[0]
_results['Place field Boundary y'] = boundary[1]
_results['Number of Spikes in Place Field'] = (
number_of_spikes_in_place_field)
_results['Percentage of Spikes in Place Field'] = (
number_of_spikes_in_place_field * 100 / ftimes.size)
_results['Max Firing Rate in Place Field'] = (
max_firing_rate_in_place_field)
self.update_result(_results)
smoothMap[tmap == 0] = None
graph_data['posX'] = posX
graph_data['posY'] = posY
graph_data['fmap'] = fmap
graph_data['smoothMap'] = smoothMap
graph_data['firingMap'] = fmap
graph_data['tmap'] = tmap
graph_data['xedges'] = xedges
graph_data['yedges'] = yedges
graph_data['spikeLoc'] = spikeLoc
graph_data['placeField'] = pfield
graph_data['largestPlaceGroup'] = largest_group
graph_data['placeBoundary'] = boundary
graph_data['indicesInPlaceField'] = co_ords
graph_data['centroid'] = centroid
graph_data['spikeCounts'] = spike_count
return graph_data
[docs] def non_moving_periods(self, **kwargs):
"""
Return tuples indicating ranges where the subject is not moving.
Keyword Arguments
-----------------
should_smooth : bool
flags if the speed data should be smoothed, default False
min_range : float
the minimum amount of time that the subject should not be moving for
moving_thresh : float
any speed above this thresh is considered to be movement
Returns
-------
list of tuples
"""
should_smooth = kwargs.get("should_smooth", False)
min_range = kwargs.get("min_range", 150)
moving_thresh = kwargs.get("moving_thresh", 2.5)
if should_smooth:
self.smooth_speed()
not_moving = self.get_speed() < moving_thresh
return find_true_ranges(
self.get_time(), not_moving, min_range)
[docs] def get_non_moving_times(self, **kwargs):
"""
Return the times where the subject is not moving.
Keyword arguments are passed to non_moving_periods()
Returns
-------
list
A list of times where the subject is not moving
"""
ranges = self.non_moving_periods(**kwargs)
time_data = [
val for val in self.get_time()
if any(lower <= val <= upper for (lower, upper) in ranges)
]
return time_data
[docs] def loc_time_lapse(self, ftimes, **kwargs):
"""
Calculate the firing rate map over small time intervals.
This method is useful in observing
the evolution of unit-activity as the animal traverses the environment.
Following intervals are used:
0-1min, 0-2min, 0-4min, 0-8min, 0-16min or 0-end
depending on the recording duration
0-1min, 1-2min, 2-4min, 4-8min, 8-16min or 16-end
depending on the recording duration
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
graph_data = oDict()
pixel = kwargs.get('pixel', 3)
chop_bound = kwargs.get('chop_bound', 5)
filter = kwargs.get('filter', ['b', 5])
brAdjust = kwargs.get('brAdjust', True)
lim = [0, 1 * 60]
graph_data['0To1min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [0, 2 * 60]
graph_data['0To2min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [0, 4 * 60]
graph_data['0To4min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [0, 8 * 60]
graph_data['0To8min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
# 0-1min, 1-2min, 2-4min, 4-8min
lim = [1 * 60, 2 * 60]
graph_data['1To2min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [2 * 60, 4 * 60]
graph_data['2To4min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [4 * 60, 8 * 60]
graph_data['4To8min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
# 0-16min, 8-16min 0-20min, 16-20min
if self.get_duration() > 8 * 60:
if self.get_duration() > 16 * 60:
lim = [0, 16 * 60]
graph_data['0To16min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [8 * 60, 16 * 60]
graph_data['8To16min'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [0, self.get_duration()]
graph_data['0ToEnd'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [16 * 60, self.get_duration()]
graph_data['16ToEnd'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
else:
lim = [0, self.get_duration()]
graph_data['0ToEnd'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
lim = [8 * 60, self.get_duration()]
graph_data['8ToEnd'] = self.place(
ftimes, range=lim, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
return graph_data
[docs] def hd_rate(self, ftimes, **kwargs):
"""
Calculate head directional firing related measures and plots.
The firing rate of the unit with respect to the head-
direciton of the animal in the environment is called the Tuning curve.
Precited firing map from the locational firing is also calculated and
distributive ratio is measured along with the Skaggs information.
Spike-plot similar to locational firing is developed but in the circular bins
which shows the direction of the animal's head at each spike's occurring time.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
update = kwargs.get('update', True)
binsize = kwargs.get('binsize', 5) # in degrees
filttype, filtsize = kwargs.get('filter', ['b', 5])
lim = kwargs.get('range', [0, self.get_duration()])
bins = np.arange(0, 360, binsize)
spike_hd = self.get_event_loc(ftimes, **kwargs)[2]
direction = self.get_direction()[np.logical_and(
self.get_time() >= lim[0], self.get_time() <= lim[1])]
tcount, ind, bins = histogram(direction, bins)
tcount = tcount / self.get_sampling_rate()
spike_count = histogram(spike_hd, bins)[0].astype(tcount.dtype)
hd_rate = np.divide(spike_count, tcount, out=np.zeros_like(
spike_count), where=tcount != 0, casting='unsafe')
extra_for_smooth = int(np.floor(filtsize / 2))
to_smooth_hd = np.concatenate([
hd_rate[len(hd_rate) - extra_for_smooth:],
hd_rate,
hd_rate[:extra_for_smooth]])
smoothRate = smooth_1d(to_smooth_hd, filttype, filtsize)
smoothRate = smoothRate[2:len(smoothRate) - 2]
if update:
_results['HD Skaggs'] = self.skaggs_info(hd_rate, tcount)
cs = CircStat(rho=smoothRate, theta=bins)
results = cs.calc_stat()
_results['HD Rayl Z'] = results['RaylZ']
_results['HD Rayl P'] = results['RaylP']
_results['HD von Mises K'] = results['vonMisesK']
_results['HD Mean'] = results['meanTheta']
_results['HD Mean Rate'] = results['meanRho']
_results['HD Res Vect'] = results['resultant']
binInterp = np.arange(360)
rateInterp = np.interp(binInterp, bins, smoothRate)
_results['HD Peak Rate'] = np.amax(rateInterp)
_results['HD Peak'] = binInterp[np.argmax(rateInterp)]
half_max = np.amin(rateInterp) + \
(np.amax(rateInterp) - np.amin(rateInterp)) / 2
d = np.sign(half_max - rateInterp[0:-1]) - \
np.sign(half_max - rateInterp[1:])
left_possible = find(d > 0)
if len(left_possible) > 0:
left_idx = left_possible[0]
else:
left_idx = None
right_possible = find(d < 0)
if len(right_possible) > 0:
right_idx = right_possible[-1]
else:
right_idx = None
_results['HD Half Width'] = None if (not left_idx or not right_idx or left_idx > right_idx) \
else binInterp[right_idx] - binInterp[left_idx]
pixel = kwargs.get('pixel', 3)
placeData = self.place(ftimes, pixel=pixel, update=False)
fmap = placeData['smoothMap']
fmap[np.isnan(fmap)] = 0
hdPred = np.zeros(bins.size)
for i, b in enumerate(bins):
hdInd = np.logical_and(direction >= b, direction < b + binsize)
tmap = histogram2d(self.get_pos_y()[hdInd], self.get_pos_x()[
hdInd], placeData['yedges'], placeData['xedges'])[0]
tmap /= self.get_sampling_rate()
hdPred[i] = np.sum(fmap * tmap) / tmap.sum()
to_smooth_hd_pred = np.concatenate([
hdPred[len(hdPred) - extra_for_smooth:],
hdPred,
hdPred[:extra_for_smooth]])
smoothRatePred = smooth_1d(to_smooth_hd_pred, filttype, filtsize)
smoothRatePred = smoothRatePred[2:len(smoothRatePred) - 2]
graph_data['hdPred'] = smoothRatePred
self.update_result(_results)
graph_data['hd'] = direction
graph_data['hdRate'] = hd_rate
graph_data['smoothRate'] = smoothRate
graph_data['tcount'] = tcount
graph_data['bins'] = bins
graph_data['spike_hd'] = spike_hd
cs = CircStat()
cs.set_theta(spike_hd)
graph_data['scatter_radius'], graph_data['scatter_bins'] = cs.circ_scatter(
bins=2, step=0.05)
return graph_data
[docs] def hd_rate_ccw(self, ftimes, **kwargs):
"""
Calculate the tuning curve but split by direction.
Splits are into clock-wise vs counterclockwise head-directional movement.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
update = kwargs.get('update', True)
binsize = kwargs.get('binsize', 5) # in degrees
filttype, filtsize = kwargs.get('filter', ['b', 5])
lim = kwargs.get('range', [0, self.get_duration()])
thresh = kwargs.get('thresh', 30)
edges = np.arange(0, 360, binsize)
spikeInd, spikeLoc, spike_hd = self.get_event_loc(ftimes, **kwargs)
vidInd = np.logical_and(
self.get_time() >= lim[0], self.get_time() <= lim[1])
direction = self.get_direction()[vidInd]
ccwSpike_hd = spike_hd[self.get_ang_vel()[spikeInd] < -thresh]
cwSpike_hd = spike_hd[self.get_ang_vel()[spikeInd] > thresh]
ccw_dir = direction[self.get_ang_vel()[vidInd] < -thresh]
cw_dir = direction[self.get_ang_vel()[vidInd] > thresh]
tcount, ind, bins = histogram(cw_dir, edges)
tcount = tcount / self.get_sampling_rate()
spike_count = histogram(cwSpike_hd, edges)[0].astype(tcount.dtype)
cwRate = np.divide(spike_count, tcount, out=np.zeros_like(
spike_count), where=tcount != 0, casting='unsafe')
extra_for_smooth = int(np.floor(filtsize / 2))
to_smooth_hd = np.concatenate([
cwRate[len(cwRate) - extra_for_smooth:],
cwRate,
cwRate[:extra_for_smooth]])
smoothRate = smooth_1d(to_smooth_hd, filttype, filtsize)
smoothRate = smoothRate[2:len(smoothRate) - 2]
smoothcwRate = smoothRate
tcount, ind, bins = histogram(ccw_dir, edges)
tcount = tcount / self.get_sampling_rate()
spike_count = histogram(ccwSpike_hd, edges)[0].astype(tcount.dtype)
ccwRate = np.divide(spike_count, tcount, out=np.zeros_like(
spike_count), where=tcount != 0, casting='unsafe')
to_smooth_hd = np.concatenate([
ccwRate[len(ccwRate) - extra_for_smooth:],
ccwRate,
ccwRate[:extra_for_smooth]])
smoothRate = smooth_1d(to_smooth_hd, filttype, filtsize)
smoothRate = smoothRate[2:len(smoothRate) - 2]
smoothccwRate = smoothRate
if update:
binInterp = np.arange(360)
rateInterp = np.interp(binInterp, bins, smoothcwRate)
peak_rate_cw = np.amax(rateInterp)
peak_cw = binInterp[np.argmax(rateInterp)]
rateInterp = np.interp(binInterp, bins, smoothccwRate)
peak_rate_ccw = np.amax(rateInterp)
peak_ccw = binInterp[np.argmax(rateInterp)]
_results['HD Delta'] = (peak_rate_ccw - peak_rate_cw)
_results['HD Peak CW'] = peak_cw
_results['HD Peak CCW'] = peak_ccw
_results['HD Peak Rate CW'] = peak_rate_cw
_results['HD Peak Rate CCW'] = peak_rate_ccw
self.update_result(_results)
graph_data['bins'] = bins
graph_data['hdRateCW'] = smoothcwRate
graph_data['hdRateCCW'] = smoothccwRate
return graph_data
[docs] def hd_time_lapse(self, ftimes):
"""
Calculate the tuning curve over small time intervals.
This method is useful in observing the
evolution of unit-activity as the animal traverses the environment.
Following intervals ar used:
0-1min, 0-2min, 0-4min, 0-8min, 0-16min or 0-end
depending on the recording duration
0-1min, 1-2min, 2-4min, 4-8min, 8-16min or 16-end
depending on the recording duration
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
# Breaking down the spike plot for firing evolution
# 0-1min, 0-2min, 0-4min, 0-8min
graph_data = oDict()
lim = [0, 1 * 60]
graph_data['0To1min'] = self.hd_rate(ftimes, range=lim, update=False)
lim = [0, 2 * 60]
graph_data['0To2min'] = self.hd_rate(ftimes, range=lim, update=False)
lim = [0, 4 * 60]
graph_data['0To4min'] = self.hd_rate(ftimes, range=lim, update=False)
lim = [0, 8 * 60]
graph_data['0To8min'] = self.hd_rate(ftimes, range=lim, update=False)
# 0-1min, 1-2min, 2-4min, 4-8min
lim = [1 * 60, 2 * 60]
graph_data['1To2min'] = self.hd_rate(ftimes, range=lim, update=False)
lim = [2 * 60, 4 * 60]
graph_data['2To4min'] = self.hd_rate(ftimes, range=lim, update=False)
lim = [4 * 60, 8 * 60]
graph_data['4To8min'] = self.hd_rate(ftimes, range=lim, update=False)
# 0-16min, 8-16min 0-20min, 16-20min
if self.get_duration() > 8 * 60:
if self.get_duration() > 16 * 60:
lim = [0, 16 * 60]
graph_data['0To16min'] = self.hd_rate(
ftimes, range=lim, update=False)
lim = [8 * 60, 16 * 60]
graph_data['8To16min'] = self.hd_rate(
ftimes, range=lim, update=False)
lim = [0, self.get_duration()]
graph_data['0ToEnd'] = self.hd_rate(
ftimes, range=lim, update=False)
lim = [16 * 60, self.get_duration()]
graph_data['16ToEnd'] = self.hd_rate(
ftimes, range=lim, update=False)
else:
lim = [0, self.get_duration()]
graph_data['0ToEnd'] = self.hd_rate(
ftimes, range=lim, update=False)
lim = [8 * 60, self.get_duration()]
graph_data['8ToEnd'] = self.hd_rate(
ftimes, range=lim, update=False)
return graph_data
[docs] def hd_shuffle(self, ftimes, **kwargs):
"""
Shuffling analysis of the unit.
This is used to see if the head-directional firing
specifity is by chance or actually correlated to the head-direction of
the animal.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
nshuff = kwargs.get('nshuff', 500)
limit = kwargs.get('limit', 0)
bins = kwargs.get('bins', 100)
# limit = 0 implies enirely random shuffle, limit = 'x' implies nshuff
# number of shuffles in the range [-x x]
dur = self.get_time()[-1]
shift = nprand.uniform(low=limit - dur, high=dur - limit, size=nshuff)
raylZ = np.zeros((nshuff,))
vonMisesK = np.zeros((nshuff,))
for i in np.arange(nshuff):
shift_ftimes = ftimes + shift[i]
# Wrapping up the time
shift_ftimes[shift_ftimes > dur] -= dur
shift_ftimes[shift_ftimes < 0] += dur
hdData = self.hd_rate(shift_ftimes, update=False)
cs = CircStat(rho=hdData['smoothRate'], theta=hdData['bins'])
results = cs.calc_stat()
raylZ[i] = results['RaylZ']
vonMisesK[i] = results['vonMisesK']
graph_data['raylZ'] = raylZ
graph_data['vonMisesK'] = vonMisesK
hdData = self.hd_rate(ftimes, update=False)
cs.set_rho(hdData['smoothRate'])
results = cs.calc_stat()
graph_data['refRaylZ'] = results['RaylZ']
graph_data['refVonMisesK'] = results['vonMisesK']
graph_data['raylZCount'], ind, graph_data['raylZEdges'] = histogram(
raylZ, bins=bins)
graph_data['raylZPer95'] = np.percentile(raylZ, 95)
graph_data['vonMisesKCount'], ind, graph_data['vonMisesKEdges'] = histogram(
vonMisesK, bins=bins)
graph_data['vonMisesKPer95'] = np.percentile(vonMisesK, 95)
_results['HD Shuff Rayl Z Per 95'] = np.percentile(raylZ, 95)
_results['HD Shuff von Mises K Per 95'] = np.percentile(vonMisesK, 95)
self.update_result(_results)
return graph_data
[docs] def hd_shift(self, ftimes, shift_ind=np.arange(-10, 11)):
"""
Head directional analyses performed shifted in time.
Analysis of firing specificity of the unit with respect to animal's
head direction to oberve whether it represents past direction or
anicipates a future direction.
Parameters
----------
shift_ind : ndarray
Index of spatial resolution shift for the spike event time. Shift -1
implies shift to the past by 1 spatial time resolution, and +2 implies
shift to the future by 2 spatial time resoultion.
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
shift = shift_ind / self.get_sampling_rate()
shiftlen = shift.size
dur = self.get_time()[-1]
delta = np.zeros((shiftlen,))
skaggs = np.zeros((shiftlen,))
peakRate = np.zeros((shiftlen,))
for i in np.arange(shiftlen):
shift_ftimes = ftimes + shift[i]
# Wrapping up the time
shift_ftimes[shift_ftimes > dur] -= dur
shift_ftimes[shift_ftimes < 0] += dur
hdData = self.hd_rate_ccw(shift_ftimes, update=False)
delta[i] = hdData['bins'][np.argmax(
hdData['hdRateCCW'])] - hdData['bins'][np.argmax(hdData['hdRateCW'])]
hdData = self.hd_rate(shift_ftimes, update=False)
peakRate[i] = np.amax(hdData['smoothRate'])
skaggs[i] = self.skaggs_info(hdData['hdRate'], hdData['tcount'])
graph_data['delta'] = delta
graph_data['skaggs'] = skaggs
graph_data['peakRate'] = peakRate
graph_data['shiftTime'] = shift * 1000 # changing to milisecond
# Find out the optimum skaggs location
shiftUpsamp = np.arange(
shift[0], shift[-1], np.mean(np.diff(shift)) / 10)
skaggsUpsamp = np.interp(shiftUpsamp, shift, skaggs)
peakRateUpsamp = np.interp(shiftUpsamp, shift, peakRate)
dfit_result = linfit(shift, delta)
deltaFit = dfit_result['yfit']
sortInd = np.argsort(deltaFit)
_results['HD ATI'] = np.interp(
0, deltaFit[sortInd], shift[sortInd]) * 1000 if dfit_result['Pearson R'] >= 0.85 else np.nan
graph_data['deltaFit'] = deltaFit
imax = sg.argrelmax(skaggsUpsamp)[0]
maxloc = find(skaggsUpsamp[imax] == skaggsUpsamp.max())
_results['HD Opt Shift Skaggs'] = np.nan if maxloc.size != 1 else \
(np.nan if imax[maxloc] == 0 or imax[maxloc] ==
skaggsUpsamp.size else shiftUpsamp[imax[maxloc]][0] * 1000) # in milisecond
imax = sg.argrelmax(peakRateUpsamp)[0]
maxloc = find(peakRateUpsamp[imax] == peakRateUpsamp.max())
_results['HD Opt Shift Peak Rate'] = np.nan if maxloc.size != 1 else \
(np.nan if imax[maxloc] == 0 or imax[maxloc] ==
peakRateUpsamp.size else shiftUpsamp[imax[maxloc]][0] * 1000) # in milisecond
self.update_result(_results)
return graph_data
[docs] @staticmethod
def place_field(pmap, thresh=0.2, required_neighbours=9):
"""
Identify high firing neighbouring bins in the place map.
For each bin in the place map, it is assigned to an integer group.
These groups denote which neighbouring area the bin belongs to.
The default group is 0, which every bin belongs to initially.
Parameters
----------
pmap : ndarray
The firing map to calculate place fields from
thresh : float
The fraction of the peak firing that a bin must exceed
required_neighbours : int
The number of adjacent bins that must be together to form a field
Returns
-------
(ptag, largest_group_num)
ptag : np.ndarray
A 2D array indicating the tag of each bin in the firing map.
The tag is the same as the group the bin belongs to.
largest_group_num : int
The tag of the group with the highest firing rate.
If it is 0, then no groups were found that
satisfy the thresh and required_neighbours conditions.
"""
def alongColumn(pfield, ptag, J, I):
"""
Iterate along the columns of the ptags to find vertical
neighbours.
Parameters
----------
pfield : ndarray
The place field map, consisting of
1 for groups satisying rules and 0 otherwise
ptag : ndarray
The place field map, grouped into tags of neighbouring areas
J : int
The vertical index to start searching at
I : int
The horizontal index to start searching at
"""
Ji = J
Ii = I
rows = []
while J + 1 < ptag.shape[0]:
if not pfield[J + 1, I] or ptag[J + 1, I]:
break
else:
ptag[J + 1, I] = ptag[J, I]
rows.append(J + 1)
J += 1
J = Ji
while J - 1 > 0:
if not pfield[J - 1, I] or ptag[J - 1, I]:
break
else:
ptag[J - 1, I] = ptag[J, I]
rows.append(J - 1)
J -= 1
for J in rows:
if J != Ji:
ptag = alongRows(pfield, ptag, J, Ii)
return ptag
def alongRows(pfield, ptag, J, I):
"""
Iterate along the columns of the ptags, finds horizontal
neighbours.
Parameters
----------
pfield : ndarray
The place field map, consisting of
1 for groups satisying rules and 0 otherwise
ptag : ndarray
The place field map, grouped into tags of neighbouring areas
J : int
The vertical index to start searching at
I : int
The horizontal index to start searching at
"""
Ii = I
columns = []
while I + 1 <= ptag.shape[1]:
if not pfield[J, I + 1] or ptag[J, I + 1]:
break
else:
ptag[J, I + 1] = ptag[J, I]
columns.append(I + 1)
I += 1
I = Ii
while I - 1 >= 0:
if not pfield[J, I - 1] or ptag[J, I - 1]:
break
else:
ptag[J, I - 1] = ptag[J, I]
columns.append(I - 1)
I -= 1
for I in columns:
if I != Ii:
ptag = alongColumn(pfield, ptag, J, I)
return ptag
# Finding the place map firing field:
# Rules to form a field:
# 1. There are sufficient spikes in bin
# 2. The bin shares a side with other bins which contain spikes
# Apply Rule 1
where_are_NaNs = np.isnan(pmap)
pmap[where_are_NaNs] = 0
pmap = pmap / pmap.max()
weights = pmap
pmap = pmap > thresh
# Pad the place field with a single layer of zeros to compare
# neighbours
pfield = np.zeros(np.add(pmap.shape, 2))
pfield[1:-1, 1:-1] = pmap
# Apply rule 2
has_neighbour_horizontal = np.logical_or(
pfield[0:-2, 1:-1], pfield[2:, 1:-1])
has_neighbour_vertical = np.logical_or(
pfield[1:-1, 0:-2], pfield[1:-1, 2:])
# Combine rules 1 and 2
pfield[1:-1, 1:-1] = np.logical_and(
pmap,
np.logical_or(has_neighbour_horizontal, has_neighbour_vertical))
# Initialise all tags to 0
ptag = np.zeros(pfield.shape, dtype=int)
# Find the first non zero entry of the pfield
J, I = find2d(pfield)
# Group all the neighbouring pixels
group = 1
for (j, i) in zip(J, I):
if ptag[j, i] == 0:
ptag[j, i] = group
group = group + 1
# Tag all neighbours as being of the same group
ptag = alongColumn(pfield, ptag, j, i)
# Remove the padding
ptag = ptag[1:-1, 1:-1]
# Find the largest field, and also remove fields that are too small
# If there are no large enough fields, label all bins as 0
uniques, counts = np.unique(ptag[ptag > 0], return_counts=True)
max_count, largest_group_num = 0, 0
reduction = 0
for unique, count in zip(uniques, counts):
# Don't consider groups that are small
unique = unique - reduction
if count < required_neighbours:
ptag[ptag == unique] = 0
ptag[ptag > unique] = ptag[ptag > unique] - 1
reduction = reduction + 1
# Define the largest group to be the one with largest weight
# Could also be the one with the largest area
else:
interest_weights = weights[ptag == unique]
weight = np.sum(interest_weights)
if weight > max_count:
max_count = weight
largest_group_num = unique
return ptag, largest_group_num
[docs] @staticmethod
def place_field_centroid(pfield, fmap, group_num, **kwargs):
"""
Calculate the centroid of a place field.
This is performed as a centre of mass calculation.
Where the mass of a bin is the firing rate in that bin.
As such, passing the group number as 0 with
pfield containing all zeroes, would return the
centre of mass of the entire firing map.
Parameters
----------
pfield : ndarray
Input place field consisting of a map of groups
fmap : ndarray
Input firing map
group_num : int
The group to get the centroid for
**kwargs :
Keyword arguments
Returns
-------
ndarray
A list of co-ordinates for each place field group
"""
# For each group, get the list of co-ordinates from the pfield
co_ords = np.array(np.where(pfield == group_num))
weights = fmap[co_ords[0], co_ords[1]]
return centre_of_mass(co_ords, weights, axis=1)
[docs] def get_event_loc(self, ftimes, **kwargs):
"""
Calculate location of the event from its timestamps.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking or any other events
**kwargs
Keyword arguments
Returns
-------
(ndarray, [ndarray, ndarray], ndarray)
ndarray
Index of the events in spatial-timestamps
[ Two item list containing
ndarray
x-coordinates of the event location
ndarray
y-ccordinates of the event location
]
ndarray
direction of the animal at the time of the event
"""
time = self.get_time()
lim = kwargs.get('range', [0, time.max()])
keep_zero_idx = kwargs.get('keep_zero_idx', False)
hist = histogram(
ftimes[np.logical_and(
ftimes >= lim[0], ftimes < lim[1])],
time)
vidInd = hist[1]
if keep_zero_idx:
retInd = vidInd
else:
retInd = vidInd[vidInd != 0]
return retInd, [self._pos_x[retInd],
self._pos_y[retInd]], self._direction[retInd]
[docs] def loc_shuffle(self, ftimes, **kwargs):
"""
Shuffling analysis of the unit.
This is used to see if the locational firing
specifity is by chance or actually correlated to the location of the
animal.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
nshuff = kwargs.get('nshuff', 500)
limit = kwargs.get('limit', 0)
bins = kwargs.get('bins', 100)
brAdjust = kwargs.get('brAdjust', False)
pixel = kwargs.get('pixel', 3)
chop_bound = kwargs.get('chop_bound', 5)
filter = kwargs.get('filter', ['b', 5])
# limit = 0 implies enirely random shuffle, limit = 'x' implies nshuff
# number of shuffles in the range [-x x]
dur = self.get_time()[-1]
shift = nprand.uniform(low=limit - dur, high=dur - limit, size=nshuff)
skaggs = np.zeros((nshuff,))
sparsity = np.zeros((nshuff,))
coherence = np.zeros((nshuff,))
for i in np.arange(nshuff):
shift_ftimes = ftimes + shift[i]
# Wrapping up the time
shift_ftimes[shift_ftimes > dur] -= dur
shift_ftimes[shift_ftimes < 0] += dur
placeData = self.place(shift_ftimes, filter=filter,
chop_bound=chop_bound, pixel=pixel,
brAdjust=brAdjust, update=False)
skaggs[i] = self.skaggs_info(placeData['fmap'], placeData['tmap'])
sparsity[i] = self.spatial_sparsity(
placeData['fmap'], placeData['tmap'])
coherence[i] = np.corrcoef(
placeData['fmap'][placeData['tmap'] != 0].flatten(),
placeData['smoothMap'][placeData['tmap'] != 0].flatten())[0, 1]
graph_data['skaggs'] = skaggs
graph_data['coherence'] = coherence
graph_data['sparsity'] = sparsity
placeData = self.place(ftimes, pixel=pixel, filter=filter, brAdjust=brAdjust,
chop_bound=chop_bound, update=False)
graph_data['refSkaggs'] = self.skaggs_info(
placeData['fmap'], placeData['tmap'])
graph_data['refSparsity'] = self.spatial_sparsity(
placeData['fmap'], placeData['tmap'])
graph_data['refCoherence'] = np.corrcoef(
placeData['fmap'][placeData['tmap'] != 0].flatten(),
placeData['smoothMap'][placeData['tmap'] != 0].flatten())[0, 1]
graph_data['skaggsCount'], graph_data['skaggsEdges'] = np.histogram(
skaggs, bins=bins)
graph_data['skaggs95'] = np.percentile(skaggs, 95)
graph_data['sparsityCount'], graph_data['sparsityEdges'] = np.histogram(
sparsity, bins=bins)
graph_data['sparsity05'] = np.percentile(sparsity, 5)
graph_data['coherenceCount'], graph_data['coherenceEdges'] = np.histogram(
coherence, bins=bins)
graph_data['coherence95'] = np.percentile(coherence, 95)
_results['Loc Skaggs 95'] = np.percentile(skaggs, 95)
_results['Loc Sparsity 05'] = np.percentile(sparsity, 95)
_results['Loc Coherence 95'] = np.percentile(coherence, 95)
self.update_result(_results)
return graph_data
[docs] def loc_shift(self, ftimes, shift_ind=np.arange(-10, 11), **kwargs):
"""
Locational analysis shifted in time.
Analysis of firing specificity of the unit with respect to animal's
location to oberve whether it represents past location of the animal or
anicipates a future location.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
shift_ind : ndarray
Index of spatial resolution shift for the spike event time. Shift -1
implies shift to the past by 1 spatial time resolution, and +2 implies
shift to the future by 2 spatial time resoultion.
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
brAdjust = kwargs.get('brAdjust', False)
pixel = kwargs.get('pixel', 3)
chop_bound = kwargs.get('chop_bound', 5)
_filter = kwargs.get('filter', ['b', 5])
# limit = 0 implies enirely random shuffle, limit = 'x' implies nshuff
# number of shuffles in the range [-x x]
shift = shift_ind / self.get_sampling_rate()
shiftlen = shift.size
dur = self.get_time()[-1]
skaggs = np.zeros((shiftlen,))
sparsity = np.zeros((shiftlen,))
coherence = np.zeros((shiftlen,))
for i in np.arange(shiftlen):
shift_ftimes = ftimes + shift[i]
# Wrapping up the time
shift_ftimes[shift_ftimes > dur] -= dur
shift_ftimes[shift_ftimes < 0] += dur
placeData = self.place(
shift_ftimes, pixel=pixel, filter=_filter,
brAdjust=brAdjust, chop_bound=chop_bound, update=False)
skaggs[i] = self.skaggs_info(placeData['fmap'], placeData['tmap'])
sparsity[i] = self.spatial_sparsity(
placeData['fmap'], placeData['tmap'])
coherence[i] = np.corrcoef(
placeData['fmap'][placeData['tmap'] != 0].flatten(),
placeData['smoothMap'][placeData['tmap'] != 0].flatten())[0, 1]
graph_data['skaggs'] = skaggs
graph_data['sparsity'] = sparsity
graph_data['coherence'] = coherence
graph_data['shiftTime'] = shift
# Find out the optimum skaggs location
shiftUpsamp = np.arange(
shift[0], shift[-1], np.mean(np.diff(shift)) / 4)
skaggsUpsamp = np.interp(shiftUpsamp, shift, skaggs)
sparsityUpsamp = np.interp(shiftUpsamp, shift, sparsity)
coherenceUpsamp = np.interp(shiftUpsamp, shift, coherence)
imax = sg.argrelmax(skaggsUpsamp)[0]
maxloc = find(skaggsUpsamp[imax] == skaggsUpsamp.max())
_results['Loc Opt Shift Skaggs'] = np.nan if maxloc.size != 1 else (
np.nan if imax[maxloc] == 0 or imax[maxloc] == skaggsUpsamp.size else shiftUpsamp[imax[maxloc]])
imin = sg.argrelmin(sparsityUpsamp)[0]
minloc = find(sparsityUpsamp[imin] == sparsityUpsamp.min())
_results['Loc Opt Shift Sparsity'] = np.nan if minloc.size != 1 else (
np.nan if imin[minloc] == 0 or imin[minloc] == sparsityUpsamp.size else shiftUpsamp[imin[minloc]])
imax = sg.argrelmax(coherenceUpsamp)[0]
maxloc = find(coherenceUpsamp[imax] == coherenceUpsamp.max())
_results['Loc Opt Shift Coherence'] = np.nan if maxloc.size != 1 else (
np.nan if imax[maxloc] == 0 or imax[maxloc] == coherenceUpsamp.size else shiftUpsamp[imax[maxloc]])
self.update_result(_results)
return graph_data
[docs] def loc_auto_corr(self, ftimes, **kwargs):
"""
Calculate the two-dimensional correlation of firing map.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
graph_data = {}
minPixel = kwargs.get('minPixel', 20)
pixel = kwargs.get('pixel', 3)
if 'update' in kwargs.keys():
del kwargs['update']
placeData = self.place(ftimes, update=False, **kwargs)
fmap = placeData['smoothMap']
fmap[np.isnan(fmap)] = 0
leny, lenx = fmap.shape
xshift = np.arange(-(lenx - 1), lenx)
yshift = np.arange(-(leny - 1), leny)
corrMap = np.zeros((yshift.size, xshift.size))
for J, ysh in enumerate(yshift):
for I, xsh in enumerate(xshift):
if ysh >= 0:
map1YInd = np.arange(ysh, leny)
map2YInd = np.arange(leny - ysh)
elif ysh < 0:
map1YInd = np.arange(leny + ysh)
map2YInd = np.arange(-ysh, leny)
if xsh >= 0:
map1XInd = np.arange(xsh, lenx)
map2XInd = np.arange(lenx - xsh)
elif xsh < 0:
map1XInd = np.arange(lenx + xsh)
map2XInd = np.arange(-xsh, lenx)
map1 = fmap[tuple(np.meshgrid(map1YInd, map1XInd))]
map2 = fmap[tuple(np.meshgrid(map2YInd, map2XInd))]
if map1.size < minPixel:
corrMap[J, I] = -1
else:
corrMap[J, I] = corr_coeff(map1, map2)
graph_data['corrMap'] = corrMap
graph_data['xshift'] = xshift * pixel
graph_data['yshift'] = yshift * pixel
return graph_data
[docs] def loc_rot_corr(self, ftimes, **kwargs):
"""
Calculate the rotational correlation of the firing map.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
graph_data = {}
binsize = kwargs.get('binsize', 3) # degrees
bins = np.arange(0, 360, binsize)
placeData = self.place(ftimes, update=False, **kwargs)
fmap = placeData['smoothMap']
fmap[np.isnan(fmap)] = 0
rotCorr = [corr_coeff(rot_2d(fmap, theta), fmap)
for k, theta in enumerate(bins)]
graph_data['rotAngle'] = bins
graph_data['rotCorr'] = rotCorr
return graph_data
[docs] def border(self, ftimes, **kwargs):
"""
Analysis of the firing of a unit with respect to the border.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
dist, xedges, yedges, distMat = self.get_border()
pixel = np.diff(xedges).mean()
update = kwargs.get('update', True)
thresh = kwargs.get('thresh', 0.2)
cbinsize = kwargs.get('cbinsize', 5) # Circular binsize in degrees
lim = kwargs.get('range', [0, self.get_duration()])
steps = kwargs.get('nstep', 5)
distBins = np.arange(dist.min(), dist.max() + pixel, pixel)
if 'update' in kwargs.keys():
del kwargs['update']
placeData = self.place(ftimes, range=lim, update=False, **kwargs)
fmap = placeData['smoothMap']
xind = np.array([])
yind = np.array([])
if placeData['xedges'].max() < xedges.max():
xind = xedges <= placeData['xedges'].max()
xedges = xedges[xind]
if placeData['yedges'].max() < yedges.max():
yind = yedges <= placeData['yedges'].max()
yedges = yedges[xind]
if xind.any():
distMat = distMat[:, xind]
if yind.any():
distMat = distMat[yind, :]
nanInd = np.isnan(fmap)
fmap[nanInd] = 0
# Calculated from smooth FR map not by smoothing from raw rate
smoothRate = np.zeros(distBins.shape)
for i, edge in enumerate(distBins):
edge_ind = distMat == edge
if edge_ind.any() and np.logical_and(
np.logical_not(nanInd), distMat == edge).any():
smoothRate[i] = fmap[np.logical_and(
np.logical_not(nanInd), distMat == edge)].mean()
fmap /= fmap.max()
tcount = histogram(dist, distBins)[0]
tcount = tcount / self.get_sampling_rate()
spikeDist = dist[self.get_event_loc(ftimes)[0]]
spike_count = histogram(spikeDist, distBins)[0].astype(tcount.dtype)
distRate = np.divide(spike_count, tcount, out=np.zeros_like(spike_count),
where=tcount != 0, casting='unsafe') # for skaggs only
pixelCount = histogram(distMat[np.logical_not(nanInd)], distBins)[0]
distCount = np.divide(
histogram(distMat[fmap >= thresh], distBins)[0], pixelCount,
out=np.zeros_like(distBins), where=pixelCount != 0, casting='unsafe')
circBins = np.arange(0, 360, cbinsize)
X, Y = np.meshgrid(xedges, np.flipud(yedges))
X = X - xedges[-1] / 2
Y = Y - yedges[-1] / 2
angDist = np.arctan2(Y, X) * 180 / np.pi
angDist[angDist < 0] += 360
meanDist = distMat[fmap >= thresh].mean()
cs = CircStat()
cs.set_theta(angDist[np.logical_and(
distMat <= meanDist, fmap >= thresh)])
angDistCount = cs.circ_histogram(circBins)[0]
# Circular linear map
circLinMap = np.zeros((distBins.size, circBins.size))
for i, edge in enumerate(distBins):
cs.set_theta(angDist[np.logical_and(
distMat == edge, fmap >= thresh)])
circLinMap[i, :] = cs.circ_histogram(circBins)[0]
perSteps = np.arange(0, 1, 1 / steps)
perDist = np.zeros(steps)
for i in np.arange(steps):
perDist[i] = distMat[np.logical_and(
np.logical_not(nanInd),
np.logical_and(fmap >= perSteps[i], fmap < perSteps[i] + 1 / steps))].mean()
if update:
_results['Border Skaggs'] = self.skaggs_info(distRate, tcount)
angDistExt = np.append(angDistCount, angDistCount)
segsize = find_chunk(angDistExt > 0)[0]
_results['Border Ang Ext'] = max(segsize) * cbinsize
cBinsInterp = np.arange(0, 360, 0.1)
dBinsInterp = np.arange(0, distBins[-1] + pixel, 0.1)
graph_data['cBinsInterp'] = cBinsInterp
graph_data['dBinsInterp'] = dBinsInterp
graph_data['circLinMap'] = sc.interpolate.interp2d(
circBins, distBins, circLinMap, kind='cubic')(cBinsInterp, dBinsInterp)
self.update_result(_results)
graph_data['distBins'] = distBins
graph_data['distCount'] = distCount
graph_data['circBins'] = circBins
graph_data['angDistCount'] = angDistCount
graph_data['distRate'] = distRate
graph_data['smoothRate'] = smoothRate
graph_data['perSteps'] = perSteps * 100
graph_data['perDist'] = perDist
return graph_data
[docs] def gradient(self, ftimes, **kwargs):
"""
Analysis of gradient cell.
A gradient cell is a unit whose firing rate gradually increases
as the animal traverses from the border to the center of the
environment.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = {}
alim = kwargs.get('alim', 0.25)
blim = kwargs.get('blim', 0.25)
clim = kwargs.get('clim', 0.5)
graph_data = self.border(ftimes, **kwargs)
x = graph_data['distBins']
y = graph_data['smoothRate']
x = x[np.isfinite(y)]
y = y[np.isfinite(y)]
y = np.log(y, out=np.zeros_like(y), where=y != 0, casting='unsafe')
ai = y.max()
y0 = y[x == 0]
bi = ai - y0
d_half = x.mean()
for i, dist in enumerate(x):
if i < x.size - \
1 and (y0 + bi / 2) > y[i] and (y0 + bi / 2) <= y[i + 1]:
d_half = x[i:i + 2].mean()
ci = np.log(2) / d_half
def fit_func(x, a, b, c):
return a - b * np.exp(-c * x)
popt, pcov = curve_fit(
fit_func, x, y,
p0=[ai, bi, ci],
bounds=([(1 - alim) * ai, (1 - blim) * bi, (1 - clim) * ci],
[(1 + alim) * ai, (1 + blim) * bi, (1 + clim) * ci]),
max_nfev=100000)
a, b, c = popt
y_fit = fit_func(x, *popt)
gof = residual_stat(y, y_fit, 3)
rateFit = np.exp(y_fit)
graph_data['distBins'] = x
graph_data['rateFit'] = rateFit
graph_data['diffRate'] = b * c * np.multiply(rateFit, np.exp(-c * x))
_results['Grad Pearse R'] = gof['Pearson R']
_results['Grad Pearse P'] = gof['Pearson P']
_results['Grad adj Rsq'] = gof['adj Rsq']
_results['Grad Max Growth Rate'] = c * np.exp(a - 1)
_results['Grad Inflect Dist'] = np.log(b) / c
self.update_result(_results)
return graph_data
[docs] def grid(self, ftimes, **kwargs):
"""
Analysis of Grid cells.
These are characterised by formation of grid-like pattern
of high activity in the firing-rate map.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
tol = kwargs.get('angtol', 2)
binsize = kwargs.get('binsize', 3)
bins = np.arange(0, 360, binsize)
graph_data = self.loc_auto_corr(ftimes, update=False, **kwargs)
corrMap = graph_data['corrMap']
corrMap[np.isnan(corrMap)] = 0
xshift = graph_data['xshift']
yshift = graph_data['yshift']
pixel = np.int(np.diff(xshift).mean())
ny, nx = corrMap.shape
rpeaks = np.zeros(corrMap.shape, dtype=bool)
cpeaks = np.zeros(corrMap.shape, dtype=bool)
for j in np.arange(ny):
rpeaks[j, extrema(corrMap[j, :])[1]] = True
for i in np.arange(nx):
cpeaks[extrema(corrMap[:, i])[1], i] = True
ymax, xmax = find2d(np.logical_and(rpeaks, cpeaks))
peakDist = np.sqrt((ymax - find(yshift == 0))**2 +
(xmax - find(xshift == 0))**2)
sortInd = np.argsort(peakDist)
ymax, xmax, peakDist = ymax[sortInd], xmax[sortInd], peakDist[sortInd]
ymax, xmax, peakDist = (
ymax[1:7], xmax[1:7], peakDist[1:7]) if ymax.size >= 7 else ([], [], [])
theta = np.arctan2(yshift[ymax], xshift[xmax]) * 180 / np.pi
theta[theta < 0] += 360
sortInd = np.argsort(theta)
ymax, xmax, peakDist, theta = (
ymax[sortInd], xmax[sortInd], peakDist[sortInd], theta[sortInd])
graph_data['ymax'] = yshift[ymax]
graph_data['xmax'] = xshift[xmax]
meanDist = peakDist.mean()
X, Y = np.meshgrid(xshift, yshift)
distMat = np.sqrt(X**2 + Y**2) / pixel
# if all of them are within tolerance(25%)
if len(ymax) == np.logical_and(peakDist > 0.75 *
meanDist, peakDist < 1.25 * meanDist).sum():
maskInd = np.logical_and(
distMat > 0.5 * meanDist, distMat < 1.5 * meanDist)
rotCorr = np.array(
[corr_coeff(rot_2d(corrMap, theta)[
maskInd], corrMap[maskInd]) for k, theta in enumerate(bins)])
ramax, rimax, ramin, rimin = extrema(rotCorr)
if rimax.size and rimin.size:
mThetaPk, mThetaTr = (
np.diff(bins[rimax]).mean(), np.diff(bins[rimin]).mean())
else:
mThetaPk, mThetaTr = (None, None)
graph_data['rimax'] = rimax
graph_data['rimin'] = rimin
graph_data['anglemax'] = bins[rimax]
graph_data['anglemin'] = bins[rimin]
graph_data['rotAngle'] = bins
graph_data['rotCorr'] = rotCorr
if mThetaPk is not None and mThetaTr is not None:
isGrid = True if 60 - tol < mThetaPk < 60 + \
tol and 60 - tol < mThetaTr < 60 + tol else False
else:
isGrid = False
meanAlpha = np.diff(theta).mean()
psi = theta[np.array([2, 3, 4, 5, 0, 1])] - theta
psi[psi < 0] += 360
meanPsi = psi.mean()
_results['Is Grid'] = isGrid and 120 - tol < meanPsi < 120 + \
tol and 60 - tol < meanAlpha < 60 + tol
_results['Grid Mean Alpha'] = meanAlpha
_results['Grid Mean Psi'] = meanPsi
_results['Grid Spacing'] = meanDist * pixel
# Difference between highest Pearson R at peaks and lowest at
# troughs
_results['Grid Score'] = rotCorr[rimax].max() - \
rotCorr[rimin].min()
_results['Grid Orientation'] = theta[0]
else:
_results['Is Grid'] = False
_results['Grid Mean Alpha'] = np.nan
_results['Grid Mean Psi'] = np.nan
_results['Grid Spacing'] = np.nan
_results['Grid Score'] = np.nan
_results['Grid Orientation'] = np.nan
self.update_result(_results)
return graph_data
[docs] def multiple_regression(self, ftimes, **kwargs):
"""
Multiple-rgression analysis.
In this, the firing rate for each variable, namely
location, head-direction, speed, AHV, and distance from border, are
used to regress the instantaneous firing rate of the unit.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
_results = oDict()
graph_data = oDict()
subsampInterv = kwargs.get('subsampInterv', 0.1)
episode = kwargs.get('episode', 120)
nrep = kwargs.get('nrep', 1000)
sampRate = 1 / subsampInterv
stamp = subsampInterv
a_size = np.round(self.get_duration(), 4)
time = np.linspace(
0, a_size, endpoint=True,
num=int(np.round(a_size / stamp) + 1))
time = np.round(time, 4)
Y = histogram(ftimes, time)[0] * sampRate # Instant firing rate
nt = time.size
xloc, yloc, loc, hd, speed, ang_vel, distBorder = list(
np.zeros((7, nt)))
tmp = self.place(ftimes)
placeRate, xedges, yedges = (
tmp['smoothMap'], tmp['xedges'], tmp['yedges'])
placeRate[np.isnan(placeRate)] = 0
for i in np.arange(nt):
ind = find(np.logical_and(self.get_time() >=
time[i], self.get_time() < time[i] + stamp))
if len(ind) == 0:
continue
xloc[i] = np.median(self.get_pos_x()[ind])
yloc[i] = np.median(self.get_pos_y()[ind])
if histogram(yloc[i], yedges)[1] < yedges.size and histogram(
xloc[i], xedges)[1] < xedges.size:
loc[i] = placeRate[histogram(yloc[i], yedges)[
1], histogram(xloc[i], xedges)[1]]
hd[i] = np.median(self.get_direction()[ind])
speed[i] = np.median(self.get_speed()[ind])
ang_vel[i] = np.median(self.get_ang_vel()[ind])
distBorder[i] = np.median(self.get_border()[0][ind])
tmp = self.hd_rate(ftimes, update=False)
hd_rate, hdBins = (tmp['hdRate'], tmp['bins'])
cs = CircStat()
cs.set_theta(hd)
# replaced by corresponding rate
hd = hd_rate[cs.circ_histogram(hdBins)[1]]
# Speed+ ang_vel will be linearly modelled, so no transformation
# required; ang_vel will be replaced by the non-linear rate
tmp = self.border(ftimes, update=False)
borderRate, borderBins = (tmp['distRate'], tmp['distBins'])
# replaced by corresponding rate
distBorder = borderRate[histogram(distBorder, borderBins)[1]]
ns = int(episode / stamp) # row to select in random
X = np.vstack((loc, hd, speed, ang_vel, distBorder)).transpose()
lm = LinearRegression(fit_intercept=True, normalize=True)
Rsq = np.zeros((nrep, 6))
for i in np.arange(nrep):
ind = np.random.permutation(time.size)[:ns]
lm.fit(X[ind, :], Y[ind])
Rsq[i, 0] = lm.score(X[ind, :], Y[ind])
for j in np.arange(5):
varind = np.array([k for k in range(5) if k != j])
# np.ix_ is used for braodcasting the index arrays
lm.fit(X[np.ix_(ind, varind)], Y[ind])
Rsq[i, j + 1] = Rsq[i, 0] - \
lm.score(X[np.ix_(ind, varind)], Y[ind])
meanRsq = Rsq.mean(axis=0)
# Regresssion parameters are alays stored in following order
varOrder = ['Total', 'Loc', 'HD', 'Speed', 'Ang Vel', 'Dist Border']
graph_data['Rsq'] = Rsq
graph_data['meanRsq'] = meanRsq
graph_data['maxRsq'] = Rsq.max(axis=0)
graph_data['minRsq'] = Rsq.min(axis=0)
graph_data['stdRsq'] = Rsq.std(axis=0)
_results['Mult Rsq'] = meanRsq[0]
for i, key in enumerate(varOrder):
if i > 0:
_results['Semi Rsq ' + key] = meanRsq[i]
self.update_result(_results)
return graph_data
[docs] def interdependence(self, ftimes, **kwargs):
"""
Interdependence analysis.
The firing rate of each variable is
predicted from another variable and the distributive ratio is measured
between the predicted firing rate and the caclulated firing rate.
Parameters
----------
ftimes : ndarray
Timestamps of the spiking activity of a unit
**kwargs
Keyword arguments
Returns
-------
None
"""
_results = oDict()
pixel = kwargs.get('pixel', 3)
hdbinsize = kwargs.get('hdbinsize', 5)
spbinsize = kwargs.get('spbinsize', 1)
sprange = kwargs.get('sprange', [0, 40])
abinsize = kwargs.get('abinsize', 10)
ang_velrange = kwargs.get('ang_velrange', [-500, 500])
placeData = self.place(ftimes, pixel=pixel, update=False)
fmap = placeData['smoothMap']
fmap[np.isnan(fmap)] = 0
xloc = self.get_pos_x()
yloc = self.get_pos_y()
xedges = placeData['xedges']
yedges = placeData['yedges']
hdData = self.hd_rate(ftimes, binsize=hdbinsize, update=False)
bins = hdData['bins']
predRate = np.zeros(bins.size)
for i, b in enumerate(bins):
ind = np.logical_and(
hdData['hd'] >= b, hdData['hd'] < b + hdbinsize)
tmap = histogram2d(yloc[ind], xloc[ind], yedges, xedges)[0]
tmap /= self.get_sampling_rate()
predRate[i] = np.sum(fmap * tmap) / tmap.sum()
_results['DR HP'] = np.abs(
np.log((1 + hdData['smoothRate']) / (1 + predRate))).sum() / bins.size
spData = self.speed(ftimes, binsize=spbinsize,
range=sprange, update=False)
bins = spData['bins']
predRate = np.zeros(bins.size)
speed = self.get_speed()
for i, b in enumerate(bins):
ind = np.logical_and(speed >= b, speed < b + spbinsize)
tmap = histogram2d(yloc[ind], xloc[ind], yedges, xedges)[0]
tmap /= self.get_sampling_rate()
predRate[i] = np.sum(fmap * tmap) / tmap.sum()
_results['DR SP'] = np.abs(
np.log((1 + spData['rate']) / (1 + predRate))).sum() / bins.size
ang_velData = self.angular_velocity(
ftimes, binsize=abinsize, range=ang_velrange, update=False)
bins = np.hstack((ang_velData['leftBins'], ang_velData['rightBins']))
predRate = np.zeros(bins.size)
ang_vel = self.get_ang_vel()
for i, b in enumerate(bins):
ind = np.logical_and(ang_vel >= b, ang_vel < b + abinsize)
tmap = histogram2d(yloc[ind], xloc[ind], yedges, xedges)[0]
tmap /= self.get_sampling_rate()
predRate[i] = np.sum(fmap * tmap) / tmap.sum()
ang_velObs = np.hstack(
(ang_velData['leftRate'], ang_velData['rightRate']))
_results['DR AP'] = np.abs(
np.log((1 + ang_velObs) / (1 + predRate))).sum() / bins.size
borderData = self.border(ftimes, update=False)
bins = borderData['distBins']
dbinsize = np.diff(bins).mean()
predRate = np.zeros(bins.size)
border = self.get_border()[0]
for i, b in enumerate(bins):
ind = np.logical_and(border >= b, border < b + dbinsize)
tmap = histogram2d(yloc[ind], xloc[ind], yedges, xedges)[0]
tmap /= self.get_sampling_rate()
predRate[i] = np.sum(fmap * tmap) / tmap.sum()
_results['DR BP'] = np.abs(
np.log((1 + borderData['distRate']) / (1 + predRate))).sum() / bins.size
self.update_result(_results)
[docs] def __str__(self):
"""Return a friendly string representation of this object."""
return "{} object with {} samples at {}Hz".format(
"NeuroChaT NSpatial", self.get_total_samples(), self.get_sampling_rate())