Source code for neurochat.nc_hdf

# -*- coding: utf-8 -*-
"""
This module implements Nhdf Class for NeuroChaT software.

@author: Md Nurul Islam; islammn at tcd dot ie

"""
import os
import io

import logging

import numpy as np

import h5py

from neurochat.nc_utils import log_exception


[docs]class Nhdf(object): """ Manages importing and exporting NeuroChaT datasets to HDF5 file. It also creates and manages the nomenclature for storage paths within the HDF5 file. Attributes ---------- _filename : str The filename of the hdf5 file. f : io.IOBase The h5py file object that is opened. """ def __init__(self, **kwargs): """See the class description.""" self._filename = kwargs.get('filename', '') self.f = None self.__type = 'hdf' if os.path.exists(self._filename): self.file()
[docs] def get_type(self): """ Return the type of object. For Nhdf, this is always `hdf` type. Parameters ---------- None Returns ------- str """ return self.__type
[docs] def get_filename(self): """ Return the full file of the HDF5 dataset. Parameters ---------- None Returns ------- str """ return self._filename
[docs] def set_filename(self, filename=None): """ Set the full file of the HDF5 dataset. Parameters ---------- filename : str Filename of the HDF5 dataset Returns ------- None """ if filename: self._filename = filename try: self.file() except BaseException: logging.error('Invalid file!')
[docs] def get_file_object(self): """ Return the file object that is opened using h5py. Parameters ---------- None Returns ------- object h5py file object """ if isinstance(self.f, io.IOBase): return self.f else: logging.warning( 'The file Nhdf instance is not open yet, use Nhdf.File() method to open it!')
[docs] def file(self): """ Open the file, and returns the file object. Parameters ---------- None Returns ------- object h5py file object """ self.close() try: self.f = h5py.File(self._filename, 'a') self.initialize() except BaseException as e: log_exception(e, 'Opening hdf file' + self._filename) return self.f
[docs] def close(self): """ Close the h5py file object. Parameters ---------- None Returns ------- None """ if isinstance(self.f, h5py.File): self.f.close() self.f = None
[docs] def initialize(self): """ Initialize the basic groups for the HDF5 file. Parameters ---------- None Returns ------- None """ groups = ['acquisition', 'processing', 'analysis', 'epochs', 'general', 'stimulus'] for g in groups: self.f.require_group(g)
[docs] def get_groups_in_path(self, path=''): """ Return the names of groups or datasets in a path. Parameters ---------- path : str path to HDF5 file group Returns ------- list Names of the groups or datasets in the path """ items = [] if path in self.f: items = list(self.f[path].keys()) else: logging.warning('No groups in the path: ' + path) return items
[docs] @staticmethod def resolve_hdfname(data=None): """ Return the name of the HDF5 file from the filenames of NeuroChaT data. Parameters ---------- data One of the NeuroChaT data objects Returns ------- hdf_name : str Hdf5 file name """ try: data_type = data.get_type() except BaseException: logging.error('The type of the data cannot be extracted!') hdf_name = None file_name = data.get_filename() system = data.get_system() if system == 'NWB': hdf_name = file_name.split('+')[0] elif system == 'SpikeInterface': if os.path.exists(file_name): f_path, f_name = os.path.split(file_name) hdf_name = os.path.join( f_path, os.path.splitext(f_name)[0] + "_NC_NWB.hdf5") else: hdf_name = "NC_NWB.hdf5" if os.path.exists(file_name): f_path, f_name = os.path.split(file_name) if system == 'Axona': if data_type == 'spike' or data_type == 'lfp': hdf_name = os.sep.join( [f_path, os.path.splitext(f_name)[0] + '.hdf5']) elif data_type == 'spatial': hdf_name = os.sep.join( [f_path, '_'.join(os.path.splitext(f_name)[0].split('_')[:-1]) + '.hdf5']) elif system == 'Neuralynx': hdf_name = os.sep.join( [f_path, f_path.split(os.sep)[-1] + '.hdf5']) return hdf_name
[docs] def resolve_datapath(self, data=None): """ Resolve and return the path of the dataset from NeuroChaT data objects. This is used to obtain a path within the HDF5 file. Parameters ---------- data NeuroChaT data objects Returns ------- str Path of the NeuroChaT data """ # No resolution for NWB file, this function will not be called if the # system == 'NWB' try: data_type = data.get_type() except BaseException: logging.error('The type of the data cannot be extracted!') path = None tag = self.get_file_tag(data) if data_type == 'spatial': path = '/processing/Behavioural/Position' elif tag and data_type == 'spike': path = '/processing/Shank/' + tag elif tag and data_type == 'lfp': path = '/processing/Neural Continuous/LFP/' + tag return path
[docs] @staticmethod def get_file_tag(data=None): """ Return the file tag or extension to name the neural data in the HDF5 file. Parameters ---------- data : NSpike or NLfp Neural data objects of NeuroChaT Returns ------- str File extention (Axona) or name (Neuralynx) of the neural datasets """ try: data_type = data.get_type() except BaseException: logging.error('The type of the data cannot be extracted!') # data is one of NSpike or Nlfp instance tag = None if data_type == 'spike' or data_type == 'lfp': f_name = data.get_filename() system = data.get_system() if system == 'NWB': tag = f_name.split('+')[-1].split('/')[-1] else: name, ext = os.path.splitext(os.path.basename(f_name)) ext = ext[1:] if system == 'Axona': tag = ext elif system == 'Neuralynx': tag = name elif system == "SpikeInterface": if data._spikeinterface_group is not None: tag = data._spikeinterface_group else: tag = name return tag
[docs] def resolve_analysis_path(self, spike=None, lfp=None): """ Return path of the dataset where analysis results will be stored. This path is also the unique unit ID. Parameters ---------- spike : NSpike Spike data object lfp : NLfp Lfp data object Returns ------- str Unique unit ID resolved from spike and lfp filenames. This is the name of the path to store the data of NeuroChaT analysis. """ # Each input is an object try: data_type = spike.get_type() except BaseException: logging.error('The type of the data cannot be extracted!') path = '' if data_type == 'spike': tag = self.get_file_tag(spike) if spike.get_system() == 'Axona' or not tag.startswith('TT'): tag = 'TT' + tag path += tag + '_SS_' + str(spike.get_unit_no()) else: logging.error('Please specify a valid spike data!') try: data_type = lfp.get_type() except BaseException: logging.error('The type of the data cannot be extracted!') if data_type == 'lfp': path += '_' + self.get_file_tag(lfp) return path
[docs] def save_dataset(self, path=None, name=None, data=None, create_group=True): """ Store a dataset to a specific path. Parameters ---------- path : str Path of a group in HDF5 file name : str Name of the new dataset data : ndarray or list of numbers Data to be stored create_group : bool If True, creates a new group if the 'path' is not in the file Returns ------- None """ if not path: logging.error('Invalid group path specified!') if not name: logging.error('Please provide a name for the dataset!') if (path in self.f) or create_group: g = self.f.require_group(path) if name in g: del g[name] # This conditional restricts the None data to store, need to change if isinstance(data, list): data = [np.nan if item is None else item for item in data] try: data = np.array(data) except BaseException: pass try: g.create_dataset(name=name, data=data) except BaseException as e: log_exception(e, 'Saving ' + name + ' dataset to hdf5 file') else: logging.error('hdf5 file path can be created or restored!')
[docs] def get_dataset(self, group=None, path='', name=''): """ Retrieve a dataset from a specific path. Parameters ---------- group : str Path of a group in HDF5 file. If None, uses self.f as the group. path : str Name of the member group. This path is relative to the 'group' name : str Name of the dataset Returns ------- ndarray or numeric objects Value of the dataset """ if isinstance(group, h5py.Group): g = group else: g = self.f if path in g: if isinstance(g[path], h5py.Dataset): return np.array(g[path]) elif isinstance(g[path], h5py.Group): g = g[path] if name in g: return np.array(g[name]) else: logging.error( 'Specify a valid name for the required dataset') elif name in g: return np.array(g[name]) else: logging.error(path + ' not found!' + 'Specify a valid path or name or check if a proper group is specified!')
[docs] def save_dict_recursive(self, path=None, name=None, data=None, create_group=True): """ Store a dictionary dataset to a specific path. If the dictionary is nested, it creates a group for each of the outermost keys. Parameters ---------- path : str Path of a group in HDF5 file name : str Name of the new dataset data : ndarray or list of numbers Data to be stored create_group : bool If True, creates a new group if the 'path' is not in the file Returns ------- None """ if not isinstance(data, dict): logging.error( 'Nhdf class method save_dict_recursive() takes only dictionary data input!') else: for key, value in data.items(): if isinstance(value, dict): self.save_dict_recursive( path=path + name + '/', name=key, data=data[key], create_group=create_group) else: self.save_dataset( path=path + name, name=key, data=value, create_group=create_group)
[docs] def save_attributes(self, path=None, attr=None): """ Store attributes to a group or dataset. Parameters ---------- path : str Path of a group or dataset in HDF5 file attr : dict Attribute names and values in a dictionary Returns ------- None """ # path has to be the absolute path of a group if path in self.f: g = self.f[path] if isinstance(attr, dict): for key, val in attr.items(): g.attrs[key] = val else: logging.error('Please specify the attributes in a dictionary!') else: logging.error('Please provide a valid hdf5 path!')
[docs] def save_object(self, obj=None): """ Store a NeuroChaT dataset to the HDF5 file. It resolves the name first and then stores the data in the storage path. Parameters ---------- obj One of the NeuroChaT data types Returns ------- None """ try: obj_type = obj.get_type() except BaseException as e: log_exception( e, 'Object passed is not a neurochat data type') try: if os.path.isfile(obj.get_filename()): fun = getattr(self, 'save_' + obj_type) fun(obj) except BaseException as e: log_exception(e, 'Saving hdf5 dataset')
[docs] def save_spatial(self, spatial=None): """ Store NSpatial() dataset to the HDF5 file. Parameters ---------- spatial : NSpatial() Spatial data object in NeuroChaT Returns ------- None """ # derive the path from the filename to ensure uniqueness self.set_filename(self.resolve_hdfname(data=spatial)) # Get the lfp data path/group path = self.resolve_datapath(data=spatial) # logging.info("Saving spatial info to {} path {}".format( # self._filename, path)) # delete old data if path in self.f: del self.f[path] # Create group afresh g = self.f.require_group(path) self.save_attributes(path=path, attr=spatial.get_record_info()) g_loc = g.require_group(path + '/' + 'location') g_dir = g.require_group(path + '/' + 'direction') g_speed = g.require_group(path + '/' + 'speed') g_ang_vel = g.require_group(path + '/' + 'angular velocity') loc = np.empty((spatial.get_total_samples(), 2)) loc[:, 0] = spatial.get_pos_x() loc[:, 1] = spatial.get_pos_y() g_loc.create_dataset(name='data', data=loc) g_loc.create_dataset(name='num_samples', data=spatial.get_total_samples()) g_loc.create_dataset(name='timestamps', data=spatial.get_time()) # g_loc.create_dataset(name='unit', data=spatial.getUnit(var='speed')) # Unit information needs to be included # need to implement the spatial.getUnit() method g_dir.create_dataset(name='data', data=spatial.get_direction()) g_dir.create_dataset(name='num_samples', data=spatial.get_total_samples()) g_dir.create_dataset(name='timestamps', data=spatial.get_time()) # g_dir.create_dataset(name='timestamps', data=h5py.SoftLink(g_loc.name+ '/timestamps')) g_speed.create_dataset(name='data', data=spatial.get_speed()) g_speed.create_dataset( name='num_samples', data=spatial.get_total_samples()) g_speed.create_dataset(name='timestamps', data=spatial.get_time()) g_ang_vel.create_dataset(name='data', data=spatial.get_ang_vel()) g_ang_vel.create_dataset( name='num_samples', data=spatial.get_total_samples()) g_ang_vel.create_dataset(name='timestamps', data=spatial.get_time()) self.close()
[docs] def save_lfp(self, lfp=None): """ Store NLfp() dataset to the HDF5 file. Parameters ---------- lfp : NLfp() LFP data object in NeuroChaT Returns ------- None """ # derive the path from the filename to ensure uniqueness self.set_filename(self.resolve_hdfname(data=lfp)) # Get the lfp data path/group path = self.resolve_datapath(data=lfp) # logging.info("Saving lfp info to {} path {}".format( # self._filename, path)) # delete old data if path in self.f: del self.f[path] # Create group afresh g = self.f.require_group(path) self.save_attributes(path=path, attr=lfp.get_record_info()) g.create_dataset(name='data', data=lfp.get_samples()) g.create_dataset(name='num_samples', data=lfp.get_total_samples()) g.create_dataset(name='timestamps', data=lfp.get_timestamp()) self.close()
[docs] def save_spike(self, spike=None): """ Store NSpike() dataset to the HDF5 file. Parameters ---------- spike : NSpike() Spike data object in NeuroChaT Returns ------- None """ # derive the path from the filename to ensure uniqueness self.set_filename(self.resolve_hdfname(data=spike)) # Get the spike data path/group path = self.resolve_datapath(data=spike) # logging.info("Saving spike info to {} path {}".format( # self._filename, path)) # delete old data if path in self.f: del self.f[path] # Create group afresh g = self.f.require_group(path) self.save_attributes(path=path, attr=spike.get_record_info()) g_clust = g.require_group(path + '/' + 'Clustering') g_wave = g.require_group(path + '/' + 'EventWaveForm/WaveForm') # From chX dictionary, create a higher order np array # NC waves are stroed in waves['ch1'], waves['ch2'] etc. ways waves = spike.get_waveform() stacked_channels = np.empty((spike.get_total_spikes( ), spike.get_samples_per_spike(), spike.get_total_channels())) i = 0 for key, val in waves.items(): stacked_channels[:, :, i] = val i += 1 g_wave.create_dataset(name='data', data=stacked_channels) g_wave.create_dataset(name='electrode_idx', data=spike.get_channel_ids()) g_wave.create_dataset(name='num_events', data=spike.get_total_spikes()) g_wave.create_dataset(name='num_samples', data=spike.get_samples_per_spike()) g_wave.create_dataset(name='timestamps', data=spike.get_timestamp()) # save Cluster number g_clust.create_dataset(name='cluster_nums', data=spike.get_unit_list()) g_clust.create_dataset(name='num', data=spike.get_unit_tags()) g_clust.create_dataset(name='times', data=spike.get_timestamp()) self.close()
[docs] def save_cluster(self, clust=None): """ Store NClust() dataset to the HDF5 file. Parameters ---------- clust : NClust() Cluster data object in NeuroChaT Returns ------- None """ # Nclust is a NSpike derivative (inherited from NSpike) to add clustering facilities to the NSpike data # But we will consider putting it within NSpike itself # This will store data to Shank's Clustering and Feature Extraction # group logging.warning('save_cluster() method is not implemented yet!')
[docs] def path_exists(self, path): """ Return True if self.f exists and path is in it. path can be either a path in the hdf5 file. or the full name of a hdf5 file. Parameters ---------- path : str The path to check for. Returns ------- bool Whether or not the path is exists See also -------- neurochat.nc_control.exist_hdf_path """ if path == "": return False if "+" in path: name, path = path.split("+") if os.path.isfile(name): self.set_filename(name) else: return False return path in self.f