# -*- coding: utf-8 -*-
"""
This module implements the NEvent Class for NeuroChaT software.
@author: Md Nurul Islam; islammn at tcd dot ie
"""
import logging
import os
from collections import OrderedDict as oDict
import re
import numpy as np
from neurochat.nc_base import NBase
from neurochat.nc_spike import NSpike
from neurochat.nc_lfp import NLfp
[docs]class NEvent(NBase):
"""
Store external events or stimuli and relate them to neural data.
This data class is the placeholder for the dataset that contains
information about external events or stimuli.
Events are stored as a name (str), tag (int) and a timestamp (float).
Each tag is a unique integer number representing a particular event.
Parameters
----------
event_names: list of str, or np.ndarray
The name of each timestamped event.
timestamps: list of float, or np.ndarray
The time of each event.
event_train: list of int, or np.ndarry
The unique integer tag of each timestamped event.
**kwargs:
Keyword arguments passed to NBase init.
"""
def __init__(
self, event_names=[], timestamps=[], event_train=[],
**kwargs):
"""See NEvent class description."""
super().__init__(**kwargs)
self._event_trig_averages = []
self._curr_tag = []
self._curr_name = []
if isinstance(event_names, list):
self._event_names = np.array(event_names, dtype=object)
else:
self._event_names = event_names
if isinstance(timestamps, list):
self._timestamp = np.array(timestamps, dtype='f')
else:
self._timestamp = timestamps
if isinstance(event_train, list):
self._event_train = np.array(event_train, dtype=int)
else:
self._event_train = event_train
self._type = 'event'
self._timebase = None
self._total_samples = None
self._bytes_per_timestamp = None
[docs] def get_event_name(self, event_tag=None):
"""
Return name of the event from its tag.
Parameters
----------
event_tag : int
Returns
-------
event_name : str
Name of the event
"""
if event_tag is None:
event_name = self._event_names
elif event_tag in self._event_train:
event_name = self._event_names[
np.nonzero(self._event_train == event_tag)[0]]
else:
event_name = None
return event_name
[docs] def get_tag(self, event_name=None):
"""
Return tag of the event from its name.
Parameters
----------
event_name : str
Returns
-------
event_tag : int
Tag of the event
"""
if event_name is None:
event_tag = self._curr_tag
elif event_name == 'all':
event_tag = self._event_trig_averages
elif event_name in self._event_names:
event_tag = self._event_train[
np.nonzero(self._event_names == event_name)[0]]
else:
event_tag = None
return event_tag
[docs] def set_curr_tag(self, event):
"""
Set current tag of to consider for analysis.
Parameters
----------
event : str or int
If str, represent the name of the event.
If int, represents event tag.
Returns
-------
None
"""
if event is None:
pass
elif event in self._event_train:
self._curr_tag = event
self._curr_name = self.get_event_name(event)
elif event in self._event_names:
self._curr_tag = self.get_tag(event)
self._curr_name = event
[docs] def set_curr_name(self, name):
"""
Set current event using event name.
Parameters
----------
name : str
Name of the event
Returns
-------
None
"""
self.set_curr_tag(name)
[docs] def set_timebase(self, timebase):
"""
Set timebase.
Parameters
----------
timebase : int
Timebase
Returns
-------
None
"""
self._timebase = timebase
[docs] def get_timebase(self):
"""
Get timebase.
Returns
-------
float
"""
return self._timebase
[docs] def get_event_stamp(self, event=None):
"""
Return timestamps for a particular event.
Parameters
----------
event : str or int
If str, represent the name of the event.
If int, represents event tag.
Returns
-------
timestamp : ndarray
Timestamps of the event
"""
if event is None:
tag = self._curr_tag
if event in self._event_names:
tag = self.get_tag(event)
elif event in self._event_train:
tag = event
where = np.nonzero(self._event_train == tag)
timestamp = self._timestamp[where]
return timestamp
[docs] def get_timestamp(self):
"""
Return timestamps for all events.
Parameters
----------
None
Returns
-------
timestamp : ndarray
Timestamps of all the events
"""
return self._timestamp
[docs] def get_event_train(self):
"""
Return tags for all events in temporal order.
Parameters
----------
None
Returns
-------
ndarray
Train of events as train of tags
"""
return self._event_train
[docs] def get_total_samples(self):
"""
Return the total number of samples.
Parameters
----------
None
Returns
-------
int
The number of samples.
"""
return self._total_samples
[docs] def get_bytes_per_timestamp(self):
"""
Return the number of bytes per timestamp.
Parameters
----------
None
Returns
-------
int
The number of bytes per timestamp.
"""
return self._bytes_per_timestamp
def _set_event_train(self, event_train):
"""
Set tags for all events.
Parameters
----------
event_train : ndarray
Train of events as train of tags
Returns
-------
None
"""
self._event_train = event_train
def _set_timestamp(self, timestamp):
"""
Set timestamps for all events.
Parameters
----------
timestamp : ndarray
Returns
-------
timestamp : ndarray
Timestamps of all the events
"""
self._timestamp = timestamp
def _set_total_samples(self, nsamples):
"""
Set tags for all events.
Parameters
----------
event_train : ndarray
Train of events as train of tags
Returns
-------
None
"""
self._total_samples = nsamples
def _set_bytes_per_timestamp(self, bytes_per_timestamp):
self._bytes_per_timestamp = bytes_per_timestamp
[docs] def load(self, filename=None, system=None):
"""
Read event file from the recording formats.
This is currently only implemented for the Axona .stm format.
Parameters
----------
filename : str
Full filepath of the event data.
system : str
Data format or the recording system.
Currently, only "Axona" is supported.
Returns
-------
None
"""
if system is None:
system = self._system
if not system == "Axona":
logging.error("Only implemented event reading in Axona currently")
return
if filename is None:
filename = self._filename
if os.path.isfile(filename):
with open(filename, 'rb') as f:
while True:
line = f.readline()
try:
line = line.decode('latin-1')
except BaseException:
break
if line == '':
break
if line.startswith('trial_date'):
self._set_date(
' '.join(line.replace(',', ' ').split()[1:]))
if line.startswith('trial_time'):
self._set_time(line.split()[1])
if line.startswith('experimenter'):
self._set_experimenter(' '.join(line.split()[1:]))
if line.startswith('comments'):
self._set_comments(' '.join(line.split()[1:]))
if line.startswith('duration'):
self._set_duration(float(''.join(line.split()[1:])))
if line.startswith('sw_version'):
self._set_file_version(line.split()[1])
if line.startswith('timebase'):
self.set_timebase(
int(''.join(re.findall(r'\d+.\d+|\d+', line))))
if line.startswith('bytes_per_timestamp'):
self._set_bytes_per_timestamp(
int(''.join(re.findall(r'\d+', line))))
if line.startswith('num_' + 'stm' + '_samples'):
self._set_total_samples(
int(''.join(re.findall(r'\d+', line))))
if line.startswith("data_start"):
break
num_stm_samples = self.get_total_samples()
bytes_per_timestamp = self.get_bytes_per_timestamp()
timebase = self.get_timebase()
f.seek(0, 0)
header_offset = []
while True:
try:
buff = f.read(10).decode('UTF-8')
except BaseException:
break
if buff == 'data_start':
header_offset = f.tell()
break
else:
f.seek(-9, 1)
if not header_offset:
raise ValueError('data_start marker not found!')
else:
f.seek(header_offset, 0)
byte_buffer = np.fromfile(f, dtype='uint8')
stim_time = np.zeros([num_stm_samples, ], dtype='f')
for i in range(num_stm_samples):
start_idx = bytes_per_timestamp * i
end_idx = start_idx + bytes_per_timestamp
chunk = byte_buffer[start_idx:end_idx]
time_val = (
16777216 * chunk[0] +
65536 * chunk[1] +
256 * chunk[2] +
chunk[3]) / timebase
stim_time[i] = time_val
tags = np.array([1 for i in range(num_stm_samples)])
self._event_train = tags
names = np.array(
["Stimulation" for i in range(num_stm_samples)])
self._event_names = names
self.set_curr_tag(1)
self._set_timestamp(stim_time)
else:
logging.error(
"No events file found for file {}".format(filename))
def _create_tag(self, name_train):
"""
Create tags from event trains and names.
This is useful if the events are described with only their names.
Parameters
----------
name_train : list of str
Trains of events described with their names
Returns
-------
None
"""
self._event_trig_averages = list(range(0, len(self._event_names), 1))
if type(name_train).__module__ == np.__name__:
self._event_train = np.zeros(name_train.shape)
for i, name in enumerate(self._event_names):
self._event_train[name_train ==
name] = self._event_trig_averages[i]
[docs] def add_spike(self, spike=None, **kwargs):
"""
Add new spike node to current NEvent() object.
Parameters
----------
spike : NSpike
NSPike object. If None, new object is created
Returns
-------
`:obj:NSpike`
A new NSpike() object
"""
new_spike = self._add_node(NSpike, spike, 'spike', **kwargs)
return new_spike
[docs] def load_spike(self, names='all'):
"""
Load datasets of the spike nodes.
The name of each node is used for obtaining the filenames.
Parameters
----------
names : list of str
Names of the nodes to load.
If 'all', all the spike nodes are loaded.
Returns
-------
None
"""
if names == 'all':
for spike in self._spikes:
spike.load()
else:
logging.error("Spikes by name has yet to be implemented")
[docs] def add_lfp(self, lfp=None, **kwargs):
"""
Add a new LFP node to current NEvent() object.
Parameters
----------
lfp : NLfp
NLfp object. If None, new object is created
Returns
-------
`:obj:Nlfp`
A new NLfp() object
"""
new_lfp = self._add_node(NLfp, lfp, 'lfp', **kwargs)
return new_lfp
[docs] def load_lfp(self, names=None):
"""
Load datasets of the LFP nodes.
The name of each node is used for obtaining the filenames.
Parameters
----------
names : list of str
Names of the nodes to load. If `all`, all LFP nodes are loaded
Returns
-------
None
"""
if names is None:
self.load()
elif names == 'all':
for lfp in self._lfp:
lfp.load()
else:
logging.error("Lfp by name has yet to be implemented")
[docs] def psth(self, event=None, spike=None, **kwargs):
"""
Calculate peri-stimulus time histogram (PSTH).
Parameters
----------
event
Event name or tag
spike : NSpike
NSpike object to characterize
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
"""
graph_data = oDict()
if not event:
event = self._curr_tag
elif event in self._event_names:
event = self.get_tag(event)
if not spike:
spike = kwargs.get('spike', 'xxxx')
spike = self.get_spike(spike)
if event:
if spike:
graph_data = spike.psth(self.get_event_stamp(event), **kwargs)
else:
logging.error('No valid spike specified')
else:
logging.error(str(event) + ' is not a valid event')
return graph_data
[docs] def phase_dist(self, lfp=None, **kwargs):
"""
Analysis of event to LFP phase distribution.
Delegates to NLfp().phase_dist()
Parameters
----------
lfp : NLfp
LFP object which contains the LFP data.
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
See also
--------
nc_lfp.NLfp().phase_dist()
"""
if lfp is None:
logging.error('LFP data not specified!')
else:
_lfp = self._get_instance(NLfp, lfp, 'lfp')
_lfp.phase_dist(self.get_event_stamp(self.get_tag()), **kwargs)
[docs] def plv(self, lfp=None, **kwargs):
"""
Calculate phase-locking value of event train to underlying LFP signal.
Delegates to NLfp().plv()
Parameters
----------
lfp : NLfp
LFP object which contains the LFP data
**kwargs
Keyword arguments
Returns
-------
dict
Graphical data of the analysis
See also
--------
nc_lfp.NLfp().plv()
"""
if lfp is None:
logging.error('LFP data not specified!')
else:
_lfp = self._get_instance(NLfp, lfp, 'lfp')
_lfp.plv(self.get_event_stamp(self.get_tag()), **kwargs)
[docs] def __str__(self):
"""
Return a user friendly (time, name, tag) string.
Only returns the first and last 10 events if more than 20
events.
"""
output_str = ""
if len(self._timestamp) == 0:
output_str = str([])
else:
for i, (a, b, c) in enumerate(zip(
self._timestamp, self._event_train, self._event_names)):
if i < 10:
output_str = "{}{}: {}\n".format(
output_str, i + 1, (a, b, c))
elif ((i == 10) and (len(self._timestamp) > 20)):
output_str = "{}...\n".format(output_str)
elif (i >= len(self._timestamp) - 10):
output_str = "{}{}: {}\n".format(
output_str, i + 1, (a, b, c))
output_str = output_str[:-1]
return (
"Neurochat NEvent object with " +
"event info:\n{}".format(output_str))
[docs] def __repr__(self):
"""Return a REPL string, eval(repr(self)) = self."""
return "{}({!r},{!r},{!r})".format(
self.__class__.__qualname__,
self._event_names, self._timestamp, self._event_train)
# def sfc(self, lfp=None, **kwargs):
# if lfp is None:
# logging.error('LFP data not specified!')
# else:
# _lfp = self._get_instance(NLfp, lfp, 'lfp')
# _lfp.sfc(self.get_event_stamp(self.get_tag()), **kwargs)
if __name__ == "__main__":
event_times = np.random.rand(100) * 100
event_tags = np.random.randint(low=0, high=10, size=100)
event_names = np.array(["Name{}".format(x) for x in event_tags])
array = np.array
n_event = NEvent(event_names, event_times, event_tags)
rep = repr(n_event)
print("Representation is {}".format(rep))
ret_event = eval(rep)
print("Original event is {}".format(ret_event))