Source code for percephone.core.recording

"""
01/11/2023
Ourania Semelidou
Théo Gauvrit

Recording object classes
"""
import json
import os
import matplotlib
import numpy as np
import pandas as pd
import scipy.signal as ssig
import matplotlib.pyplot as plt
from percephone.utils.io import read_info, correction_drift_fluo, get_idx_frame_mesc, extract_analog_from_mesc
from percephone.analysis.response import *
from percephone.analysis.mlr import mlr
from percephone.analysis.mlr_models import classic_model
from percephone.utils.math_formulas import sigmoid_fit

matplotlib.use("Qt5Agg")
plt.switch_backend("Qt5Agg")


[docs] class Recording: """ The Recording class represents a recording session for one mouse and provides methods to analyze the data. Attributes ---------- filename : int The filename of the recording session. sf : float The sampling frequency of the recording session. genotype : str The genotype of the mouse recorded. threshold : float The threshold stimulation amplitude. input_path : str The path to the folder containing the recording data (ending with /). matrices : dict[str, dict[str, numpy.ndarray]] Stores the different matrices computed from the recording as 2D numpy.ndarray. The first dictionary has 2 keys : "EXC" and "INH" neurons, each having a dictionary as value. Both dictionaries are then keyed by the computed parameters ("Responsivity", "AUC", etc...) and have a 2D np.ndarray (nb neurons * nb stimulations) as value. df_f_exc : numpy.ndarray Stores the computed ΔF/F values for excitatory neurons at each timestep as a 2D numpy.ndarray (nb neurons * nb frames) df_f_inh : numpy.ndarray Stores the computed ΔF/F values for inhibitory neurons at each timestep as a 2D numpy.ndarray (nb neurons * nb frames) spks_exc : numpy.ndarray #TODO: check where it is computed Stores the spikes trains for excitatory neurons at each timestep as a 2D numpy.ndarray (nb neurons * nb frames) spks_inh : numpy.ndarray Stores the spikes trains for inhibitory neurons at each timestep as a 2D numpy.ndarray (nb neurons * nb frames) """ def __init__(self, input_path, rois_path, mean_f_bsl, cache=True, detection=True): """ Initializes the Recording object with the given input path, the rois file path, and cache parameters. Parameters ---------- input_path: str Path to the folder that contains the input recording files. rois_path: str Path to the Excel ROI file. cache: bool, optional (default: True) Boolean value indicating whether to use cached files if available (for ΔF/F). """ # Initialization of the instance attributes by reading the ROIs file folder_name = os.path.basename(os.path.normpath(input_path)) + "/" rois = pd.read_excel(rois_path) self.filename, inhibitory_ids, self.sf, self.genotype, self.condition, self.threshold, self.iti1_only, self.hit_rates, self.session_id = read_info(folder_name, rois, detection=detection) try: if self.filename == 7554 and self.genotype == "KO-DMSO": _, _, self.x0_psy, self.k_psy = sigmoid_fit(np.arange(start=0, stop=13, step=2), self.hit_rates, p0=[4.0, 1.0]) else: _, _, self.x0_psy, self.k_psy = sigmoid_fit(np.arange(start=0, stop=13, step=2), self.hit_rates) except Exception as e: self.x0_psy = np.nan self.k_psy = np.nan print(f"{self.filename} → Could not fit a sigmoid on hit rates: {e}") self.session_threshold = 2 * round(self.x0_psy / 2) if 0 < self.x0_psy < 12 else 12 self.bounded_x0 = self.x0_psy if 0 < self.x0_psy < 12 else 12 self.session_threshold = 8 if self.filename == 6606 else self.session_threshold # 0% hit rate at 6 (computed) self.input_path = input_path self.matrices = {"EXC": {}, "INH": {}} iscell = np.load(input_path + 'iscell.npy', allow_pickle=True) # Create a dimension in iscell to define excitatory and inhibitory cells exh_inh = np.ones((len(iscell), 1)) # array to define excitatory exh_inh[inhibitory_ids] = 0 # correct the array to define inhibitory as 0 is_exh_inh = np.append(iscell, exh_inh, axis=1) # add the array in the iscell to have a 3rd column with exh/inh cells_list = np.concatenate(np.argwhere(is_exh_inh[:, 0])) excitatory_ids = np.concatenate(np.argwhere(is_exh_inh[cells_list, 2])) # list with excitatory cells if os.path.exists(input_path + 'df_f_exc.npy') and cache: self.df_f_exc = np.load(input_path + 'df_f_exc.npy') self.df_f_inh = np.load(input_path + 'df_f_inh.npy') else: self.df_f_exc = self.compute_df_f(excitatory_ids, input_path + 'df_f_exc.npy', mean_f_bsl) self.df_f_inh = self.compute_df_f(inhibitory_ids, input_path + 'df_f_inh.npy', mean_f_bsl) if self.filename == 4445: self.df_f_exc = correction_drift_fluo(self.df_f_exc, input_path + 'df_f_exc.npy') self.df_f_inh = correction_drift_fluo(self.df_f_inh, input_path + 'df_f_inh.npy') if os.path.exists(input_path + 'spks.npy'): spks = np.load(self.input_path + "spks.npy") self.spks_exc = spks[excitatory_ids] self.spks_inh = spks[inhibitory_ids] # Using the already computed repsonsivity matrix if existing if os.path.exists(input_path + 'matrice_resp_exc.npy') and os.path.exists(input_path + 'matrice_resp_inh.npy'): self.matrices["EXC"]["Responsivity"] = np.load(self.input_path + "matrice_resp_exc.npy") self.matrices["INH"]["Responsivity"] = np.load(self.input_path + "matrice_resp_inh.npy") if os.path.exists(input_path + 'matrice_resp_exc_dff.npy') and os.path.exists(input_path + 'matrice_resp_inh_dff.npy'): print(f"{self.filename}-{self.condition} → DFF Resp files in th folder") self.matrices["EXC"]["Responsivity_dff"] = np.load(self.input_path + "matrice_resp_exc_dff.npy") self.matrices["INH"]["Responsivity_dff"] = np.load(self.input_path + "matrice_resp_inh_dff.npy") # Using the already computed peak delay and amplitude matrices if existing if os.path.exists(input_path + 'matrice_peak_delay_exc.npy') and os.path.exists(input_path + 'matrice_peak_delay_inh.npy'): self.matrices["EXC"]["Peak_delay"] = np.load(self.input_path + "matrice_peak_delay_exc.npy") self.matrices["INH"]["Peak_delay"] = np.load(self.input_path + "matrice_peak_delay_inh.npy") if os.path.exists(input_path + 'matrice_peak_amp_exc.npy') and os.path.exists(input_path + 'matrice_peak_amp_inh.npy'): self.matrices["EXC"]["Peak_amplitude"] = np.load(self.input_path + "matrice_peak_amp_exc.npy") self.matrices["INH"]["Peak_amplitude"] = np.load(self.input_path + "matrice_peak_amp_inh.npy") # Using the already computed AUC matrices if existing if os.path.exists(input_path + 'matrice_auc_exc.npy') and os.path.exists(input_path + 'matrice_auc_inh.npy'): self.matrices["EXC"]["AUC"] = np.load(self.input_path + "matrice_auc_exc.npy") self.matrices["INH"]["AUC"] = np.load(self.input_path + "matrice_auc_inh.npy") if os.path.exists(input_path + 'matrice_cum_auc_exc.npy') and os.path.exists(input_path + 'matrice_cum_auc_inh.npy'): self.matrices["EXC"]["cum_AUC"] = np.load(self.input_path + "matrice_cum_auc_exc.npy") self.matrices["INH"]["cum_AUC"] = np.load(self.input_path + "matrice_cum_auc_inh.npy") if os.path.exists(input_path + 'matrice_cum_auc_pre_exc.npy') and os.path.exists(input_path + 'matrice_cum_auc_pre_inh.npy'): self.matrices["EXC"]["cum_AUC_pre"] = np.load(self.input_path + "matrice_cum_auc_pre_exc.npy") self.matrices["INH"]["cum_AUC_pre"] = np.load(self.input_path + "matrice_cum_auc_pre_inh.npy") if os.path.exists(input_path + 'matrice_cum_auc_fixpre_exc.npy') and os.path.exists(input_path + 'matrice_cum_auc_fixpre_inh.npy'): self.matrices["EXC"]["cum_AUC_fixpre"] = np.load(self.input_path + "matrice_cum_auc_fixpre_exc.npy") self.matrices["INH"]["cum_AUC_fixpre"] = np.load(self.input_path + "matrice_cum_auc_fixpre_inh.npy") if os.path.exists(input_path + 'matrice_pos_auc_fixpre_exc.npy') and os.path.exists(input_path + 'matrice_pos_auc_fixpre_inh.npy'): self.matrices["EXC"]["pos_AUC_fixpre"] = np.load(self.input_path + "matrice_pos_auc_fixpre_exc.npy") self.matrices["INH"]["pos_AUC_fixpre"] = np.load(self.input_path + "matrice_pos_auc_fixpre_inh.npy") if os.path.exists(input_path + 'matrice_neg_auc_fixpre_exc.npy') and os.path.exists(input_path + 'matrice_neg_auc_fixpre_inh.npy'): self.matrices["EXC"]["neg_AUC_fixpre"] = np.load(self.input_path + "matrice_neg_auc_fixpre_exc.npy") self.matrices["INH"]["neg_AUC_fixpre"] = np.load(self.input_path + "matrice_neg_auc_fixpre_inh.npy") if os.path.exists(input_path + 'hit_tuned_exc.npy') and os.path.exists(input_path + 'hit_tuned_inh.npy'): self.hit_tuned_exc = np.load(self.input_path + "hit_tuned_exc.npy") self.hit_tuned_inh = np.load(self.input_path + "hit_tuned_inh.npy") if os.path.exists(input_path + 'amp_tuned_exc.npy') and os.path.exists(input_path + 'amp_tuned_inh.npy'): self.amp_tuned_exc = np.load(self.input_path + "amp_tuned_exc.npy") self.amp_tuned_inh = np.load(self.input_path + "amp_tuned_inh.npy") assert len(inhibitory_ids) == self.df_f_inh.shape[0], (f"{self.filename} → The number of inhibitory neurons " f"doesn't match between ROI ({len(inhibitory_ids)}) and " f"df_f_inh.npy file ({self.df_f_inh.shape[0]})")
[docs] def compute_df_f(self, cell_ids, save_path, mean_f_bsl): """ Compute ΔF/F and save it as a numpy.ndarray Note --------- Defining The Baseline F0: Portera-Cailliau baseline (He et al., Goel et al.): "baseline period is the 10-s period with the lowest variation (s.d.) in ΔF/F." Since to get the ΔF/F we already need to define a baseline, here the Baseline Period was defined as the 10s period with the lowest variation in F_neuropil_corrected Parameters ---------- mean_f_bsl: boolean compute df/f with mean F for correction cell_ids: list ids of cells for which the df_f will be computed. save_path : str The folder where the df_f matrix will be saved (same as the input path) Returns ------- df_f_percen : numpy.ndarray ΔF/F of all the cells selected (nb neurons * nb frames) """ print(f"[{self.filename}] Calculation Delta F / F.") f_ = np.load(self.input_path + 'F.npy', allow_pickle=True) print(f"[{self.filename}] Nb of frames in F: {len(f_[0])}") f_neu = np.load(self.input_path + 'Fneu.npy', allow_pickle=True) f_ = f_[cell_ids, :] f_neu = f_neu[cell_ids, :] session_n_frames = f_.shape[1] all_rois = f_.shape[0] # Correct for Neuropil contamination factor = 0.7 f_nn_corrected = f_ - f_neu * factor # Define The Baseline F0 # Find the 10s period (300 frames) with the lowest stdev in f_nn_corrected # window binning every 300 frames f_binned_10s = np.lib.stride_tricks.sliding_window_view(f_nn_corrected, 300, axis=1) # stdev for each binned 300 frames for all the cells f_binned_stdev = np.ndarray.std(f_binned_10s, axis=2) # find the indices of the smallest stdev (of the 300 frames with less variability) min_stdev_window_index = np.argmin(f_binned_stdev, axis=1) # get the values of the window that corresponds to the index with the smallest stdev for ech cell f_binned_10s_for_baseline = f_binned_10s[:, min_stdev_window_index, :] # transform the 3D array to 2D, eliminated axis checked and is correct f_binned_10s_for_baseline_2d = f_binned_10s_for_baseline[:, 0, :] # get the mean of the 300 values of each window for each cell f_baseline = np.mean(f_binned_10s_for_baseline_2d, axis=1) # Calculate the df/f # create a 2D array with the baseline value for each frame of f_nn_corrected f_baseline_2d_array = np.repeat(f_baseline, session_n_frames).reshape(all_rois, session_n_frames) if mean_f_bsl: f_baseline_2d_array = np.transpose([np.mean(f_nn_corrected, axis=1)]*len(f_nn_corrected[0])) # df/f for all cells for each frame df_f = np.divide(np.subtract(f_nn_corrected, f_baseline_2d_array), f_baseline_2d_array) df_f_percen = df_f * 100 np.save(save_path, df_f_percen) return df_f_percen
[docs] def zscore(self, dff): """ Computes the standardized zscore from the ΔF/F. Parameters ---------- dff: numpy.ndarray delta f over f array from inh or exc Returns ------- zscore: numpy.ndarray zscore for one set of neurons (inh or exc) """ data = np.concatenate(np.stack( dff[:, np.linspace(self.stim_time - int(0.5 * self.sf), self.stim_time, num=int(0.5 * self.sf) + 1, dtype=int)], axis=2)) mean_bsl = np.mean(data, axis=0) std = np.std(data, axis=0) zsc = np.divide(np.subtract(dff, mean_bsl[:, np.newaxis]), std[:, np.newaxis]) return zsc
[docs] def responsivity(self, dff=False): """ Compute responsivity matrices for both excitatory and inhibitory neurons. For a given neuron and a given stimulation, one integer among the following is added to the matrix : - 1 if there is an increase in the neuron's activity - 0 if there is no change in the neuron's activity - -1 if there is a decrease in the neuron's activity This method updates the 'Responsivity' matrices for the excitatory and inhibitory neurons in the 'matrices' attribute. """ print("Calcul of repsonsivity.") if dff: exc_act = self.df_f_exc inh_act = self.df_f_inh suffix = "_dff" else: exc_act = self.zscore_exc inh_act = self.zscore_inh suffix = "" self.matrices["EXC"][f"Responsivity{suffix}"] = np.array(resp_matrice(self, exc_act)) self.matrices["INH"][f"Responsivity{suffix}"] = np.array(resp_matrice(self, inh_act)) np.save(self.input_path + f"matrice_resp_exc{suffix}.npy", self.matrices["EXC"][f"Responsivity{suffix}"]) np.save(self.input_path + f"matrice_resp_inh{suffix}.npy", self.matrices["INH"][f"Responsivity{suffix}"])
def hit_tuned(self): self.hit_tuned_exc = classify_neurons(self, n_type="EXC", seed=42, n_shuffles=5000, zscore=False) self.hit_tuned_inh = classify_neurons(self, n_type="INH", seed=42, n_shuffles=5000, zscore=False) np.save(self.input_path + "hit_tuned_exc.npy", self.hit_tuned_exc) np.save(self.input_path + "hit_tuned_inh.npy", self.hit_tuned_inh) def amp_tuned(self): self.amp_tuned_exc = amp_tuned_neurons(self, n_type="EXC", seed=42, n_shuffles=5000, zscore=False) self.amp_tuned_inh = amp_tuned_neurons(self, n_type="INH", seed=42, n_shuffles=5000, zscore=False) np.save(self.input_path + "amp_tuned_exc.npy", self.hit_tuned_exc) np.save(self.input_path + "amp_tuned_inh.npy", self.hit_tuned_inh)
[docs] def delay_onset_map(self): """ Calculate the delay onset map for the excitatory and inhibitory neurons. This method updates the 'Delay_onset' matrices for the excitatory and inhibitory neurons in the 'matrices' attribute. """ self.matrices["EXC"]["Delay_onset"] = delay_matrice(self, self.df_f_exc, self.stim_time, self.matrices["EXC"]["Responsivity"]) self.matrices["INH"]["Delay_onset"] = delay_matrice(self, self.df_f_inh, self.stim_time, self.matrices["INH"]["Responsivity"])
[docs] def auc(self): """ Computes the zscore AUC matrices of the signal during the response period. Note --------- Computes the AUC between 0 (the neuron normalized baseline) and the positive part of the curve for positive responses, or between 0 and the negative part of the curve for negative responses. 10 interpolated values are added between each frame for a more precise calculation of the AUC. This method updates the 'AUC' matrices for the excitatory and inhibitory neurons in the 'matrices' attribute. """ # stim AUC self.matrices["EXC"]["AUC"] = auc_matrice(self, self.df_f_exc, self.matrices["EXC"]["Responsivity"], period="stim", method="adaptive") self.matrices["INH"]["AUC"] = auc_matrice(self, self.df_f_inh, self.matrices["INH"]["Responsivity"], period="stim", method="adaptive") np.save(self.input_path + "matrice_auc_exc.npy", self.matrices["EXC"]["AUC"]) np.save(self.input_path + "matrice_auc_inh.npy", self.matrices["INH"]["AUC"]) self.matrices["EXC"]["cum_AUC"] = auc_matrice(self, self.df_f_exc, self.matrices["EXC"]["Responsivity"], period="stim", method="cumulative") self.matrices["INH"]["cum_AUC"] = auc_matrice(self, self.df_f_inh, self.matrices["INH"]["Responsivity"], period="stim", method="cumulative") np.save(self.input_path + "matrice_cum_auc_exc.npy", self.matrices["EXC"]["cum_AUC"]) np.save(self.input_path + "matrice_cum_auc_inh.npy", self.matrices["INH"]["cum_AUC"]) # pre-stim AUC self.matrices["EXC"]["cum_AUC_pre"] = auc_matrice(self, self.df_f_exc, self.matrices["EXC"]["Responsivity"], period="prestim", method="cumulative") self.matrices["INH"]["cum_AUC_pre"] = auc_matrice(self, self.df_f_inh, self.matrices["INH"]["Responsivity"], period="prestim", method="cumulative") np.save(self.input_path + "matrice_cum_auc_pre_exc.npy", self.matrices["EXC"]["cum_AUC_pre"]) np.save(self.input_path + "matrice_cum_auc_pre_inh.npy", self.matrices["INH"]["cum_AUC_pre"]) self.matrices["EXC"]["cum_AUC_fixpre"] = auc_matrice(self, self.df_f_exc, self.matrices["EXC"]["Responsivity"], period="fixed_prestim", method="cumulative") self.matrices["INH"]["cum_AUC_fixpre"] = auc_matrice(self, self.df_f_inh, self.matrices["INH"]["Responsivity"], period="fixed_prestim", method="cumulative") np.save(self.input_path + "matrice_cum_auc_fixpre_exc.npy", self.matrices["EXC"]["cum_AUC_fixpre"]) np.save(self.input_path + "matrice_cum_auc_fixpre_inh.npy", self.matrices["INH"]["cum_AUC_fixpre"]) # pre-stim positive self.matrices["EXC"]["pos_AUC_fixpre"] = auc_matrice(self, self.df_f_exc, self.matrices["EXC"]["Responsivity"], period="fixed_prestim", method="positive") self.matrices["INH"]["pos_AUC_fixpre"] = auc_matrice(self, self.df_f_inh, self.matrices["INH"]["Responsivity"], period="fixed_prestim", method="positive") np.save(self.input_path + "matrice_pos_auc_fixpre_exc.npy", self.matrices["EXC"]["pos_AUC_fixpre"]) np.save(self.input_path + "matrice_pos_auc_fixpre_inh.npy", self.matrices["INH"]["pos_AUC_fixpre"])
def auc_neg(self): # pre-stim negative self.matrices["EXC"]["neg_AUC_fixpre"] = auc_matrice(self, self.df_f_exc, self.matrices["EXC"]["Responsivity"], period="fixed_prestim", method="negative") self.matrices["INH"]["neg_AUC_fixpre"] = auc_matrice(self, self.df_f_inh, self.matrices["INH"]["Responsivity"], period="fixed_prestim", method="negative") np.save(self.input_path + "matrice_neg_auc_fixpre_exc.npy", self.matrices["EXC"]["neg_AUC_fixpre"]) np.save(self.input_path + "matrice_neg_auc_fixpre_inh.npy", self.matrices["INH"]["neg_AUC_fixpre"])
[docs] def peak_delay_amp(self): """ Calculate the zscore peak delay and peak amplitude for each neuron and each stimulation. The peak is the maximum amplitude when the responsivity is 1 or the minimum amplitude when the responsivity is -1. numpy.NaN is added to the matrix when the responsivity is 0. This method updates the "Peak_delay" and "Peak_amplitude" matrices for the excitatory and inhibitory neurons in the "matrices" attribute. """ self.matrices["EXC"]["Peak_delay"], self.matrices["EXC"]["Peak_amplitude"] = peak_matrices(self, self.zscore_exc, self.matrices["EXC"][ "Responsivity"]) self.matrices["INH"]["Peak_delay"], self.matrices["INH"]["Peak_amplitude"] = peak_matrices(self, self.zscore_inh, self.matrices["INH"][ "Responsivity"]) np.save(self.input_path + "matrice_peak_delay_exc.npy", self.matrices["EXC"]["Peak_delay"]) np.save(self.input_path + "matrice_peak_delay_inh.npy", self.matrices["INH"]["Peak_delay"]) np.save(self.input_path + "matrice_peak_amp_exc.npy", self.matrices["EXC"]["Peak_amplitude"]) np.save(self.input_path + "matrice_peak_amp_inh.npy", self.matrices["INH"]["Peak_amplitude"])
[docs] def get_perc_resp(self, pattern=1, n_type="EXC", dff_resp=False): """Returns an array of the percentage of responsive neurons per across trials""" assert n_type in ["EXC", "INH"], "Please provide a valid neuron type (EXC or INH)" assert pattern in [-1, 0, 1, 2], "Please provide a valid pattern (-1, 0, 1 or 2)" if dff_resp: resp_mat = np.array(self.matrices[n_type]["Responsivity_dff"]) else: resp_mat = np.array(self.matrices[n_type]["Responsivity"]) nb_neurons = self.zscore_exc.shape[0] if n_type == "EXC" else self.zscore_inh.shape[0] if pattern == 2: count = np.sum(resp_mat != 0, axis=0) else: count = np.sum(resp_mat == pattern, axis=0) return count/nb_neurons * 100
def get_mean_param(self, pattern=1, n_type="EXC", parameter="Peak_amplitude"): assert n_type in ["EXC", "INH"], "Please provide a valid neuron type (EXC or INH)" assert pattern in [-1, 0, 1, 2], "Please provide a valid pattern (-1, 0, 1 or 2)" assert parameter in ["Peak_amplitude", "Peak_delay", "AUC", "cum_AUC"], "Please provide a valid parameter (Peak_amplitude or Peak_delay)" resp_mat = np.array(self.matrices[n_type]["Responsivity"]) para_mat = np.array(self.matrices[n_type][parameter]) if pattern == 2: abs_inh_para_mat = np.absolute(np.where(resp_mat == -1, para_mat, np.nan)) filtered_para_mat = np.where(resp_mat == 1, para_mat, np.where(resp_mat == -1, abs_inh_para_mat, np.nan)) else: filtered_para_mat = np.where(resp_mat == pattern, para_mat, np.nan) return np.nanmean(filtered_para_mat, axis=0)
class RecordingStimulusOnly(Recording): """ The RecordingStimulusOnly class represents a recording session for one mouse and provides methods to analyze the data. Attributes ---------- analog : pandas.DataFrame Analog file stored as a pandas DataFrame. stim_time : list[int] A list of the stimulation times in ms. stim_ampl : list[float] A list of the stimulation amplitudes. """ # def __init__(self, input_path, inhibitory_ids, sf, correction=True): # before modif def __init__(self, input_path, rois_path, mean_f_bsl=False, correction=True): """ This method initializes an instance of the class. It reads the 'analog.txt' file from the input path and checks if the "stim_ampl_time.csv" file exists. If it exists, it reads the stimulus time and amplitude data from it. If it doesn't exist, it calls the "synchronization_no_iti" method with the provided correction boolean. Parameters ---------- input_path : str Path to the folder that contains the input files. inhibitory_ids : list The list of inhibitory neuron IDs. sf : float The sampling frequency of the recording session. correction : bool, optional Whether to perform correction. Default is True. """ # super().__init__(input_path, inhibitory_ids, sf) # TODO: bad initialization of the instance super().__init__(input_path, rois_path, mean_f_bsl, cache=True, detection=False) self.analog = pd.read_csv(input_path + 'analog.txt', sep="\t") print(f"{self.filename} -> Initialization successful.") if os.path.exists(input_path + 'stim_ampl_time.csv'): print(f"{self.filename} -> Analog information already computed. Reading stimulus time and amplitude.") self.stim_time = pd.read_csv(input_path + 'stim_ampl_time.csv', usecols=['stim_time']).values.flatten() self.stim_ampl = pd.read_csv(input_path + 'stim_ampl_time.csv', usecols=['stim_ampl']).to_numpy().flatten() else: print(f"{self.filename} -> Starting synchronization...") self.synchronization_no_iti(correction) self.zscore_exc = self.zscore(self.df_f_exc) self.zscore_inh = self.zscore(self.df_f_inh) self.lick_time = np.array([]) self.detected_stim = np.array([]) self.stim_durations = np.full_like(self.stim_time, 15, dtype=int) def synchronization_no_iti(self, correction_shift): """ Get the stimulus time and amplitude from the analog file and save a csv file with the stimulus amplitude and time (stimulus starting time) in ms. Note ------- Define stimulus delivery based on the analog Stimulus = when analog[1]>=0.184 First find the first peak for each stimulus (stimuli every 500ms) Then find the stimulus onset when amplitude is >0.184 Parameters ------- correction_shift : bool Indicate if the correction of the shift of frame should be executed. """ print('Obtaining time and amplitude from analog.') analog_trace = self.analog.iloc[:, 1].to_numpy() stim_peak_indx, stim_properties = ssig.find_peaks(analog_trace, prominence=0.15, distance=200) peaks_diff = np.diff(stim_peak_indx) indices = np.concatenate([[True], peaks_diff > 50000]) stim_peak_indx = stim_peak_indx[indices] stim_ampl_analog = analog_trace[stim_peak_indx] stim_ampl_pre = np.around(stim_ampl_analog, decimals=1) stim_ampl_sort = np.sort(np.unique(stim_ampl_pre)) stim_ampl = np.zeros(len(stim_ampl_pre)) convert = {4: [4, 6, 8, 10], 5: [4, 6, 8, 10, 12], 6: [2, 4, 6, 8, 10, 12], 7: [0, 2, 4, 6, 8, 10, 12]} print("Checkpoint 1: OK") for i in range(len(stim_ampl_sort)): stim_ampl[stim_ampl_pre == stim_ampl_sort[i]] = convert[len(stim_ampl_sort)][i] def stim_onset_calc(peak_index): time_window = 400 # 40 ms before peak signal_window = analog_trace[peak_index - time_window:peak_index] return np.argwhere(signal_window > (3 * np.std(analog_trace[peak_index - 1000:peak_index - 400]) + np.mean( analog_trace[peak_index - 1000:peak_index - 400])))[0][0] + peak_index - time_window # 0.184 stim_onset_idx = list(map(stim_onset_calc, stim_peak_indx)) stim_onset_time = self.analog.iloc[stim_onset_idx, 0] self.stim_time = stim_onset_time # TEMP 13-09-2023 # plot the analog to test the index of stimulus onset fig, axs = plt.subplots(2, 1, figsize=(18, 10), sharex=True) axs[0].plot(analog_trace) axs[0].plot(stim_onset_idx, analog_trace[stim_onset_idx], 'x') plt.show() stim_onsets = np.array([int((stim / 1000) * self.sf) for stim in stim_onset_time]) print("Checkpoint 2: OK") def correction(idx): """ Perform a frames' correction. 8 is the number of frames that the last stimulus will be shifted before""" coeff = ((8 / len(self.df_f_exc[0])) / (stim_onsets[-1] - stim_onsets[0])) b = coeff * stim_onsets[0] return int(idx - (((coeff * idx) - b) * len(self.df_f_inh[0]))) self.stim_ampl = stim_ampl if correction_shift: self.stim_time = np.array(list(map(correction, stim_onsets))).flatten() else: self.stim_time = stim_onsets print(self.stim_time.shape, self.stim_ampl.shape) pd.DataFrame({'stim_time': self.stim_time, 'stim_ampl': stim_ampl}).to_csv(self.input_path + 'stim_ampl_time.csv')
[docs] class RecordingAmplDet(Recording): """ Class for analyzing recording data with amplitude detection. Inherits from the Recording class. Attributes ---------- analog : pandas.DataFrame Analog file stored as a pandas DataFrame. xls : pandas.DataFrame bpod file stored as a pandas DataFrame. stim_time : numpy.ndarray of int A 1D numpy.ndarray of the stimulation times in frames. stim_ampl : numpy.ndarray of float TODO: why not int A 1D numpy.ndarray of the stimulation amplitudes. stim_durations : numpy.ndarray of float TODO: why not int A 1D numpy.ndarray of the stimulation's durations reward_time : numpy.ndarray of int A 1D numpy.ndarray of the reward times in frames. timeout_time : numpy.ndarray of int A 1D numpy.ndarray of the timeout (no-go times) in frames. lick_time : numpy.ndarray of int A 1D numpy.ndarray of the lick times in frames. detected_stim : numpy.ndarray of bool A 1D numpy.ndarray of if the mouse responded to a stimulation. mlr_labels_exc : TODO: to fill mlr_labels_inh : TODO: to fill json : list[dict] Read of the file params_trials.json zscore_exc : numpy.ndarray of float A 2D numpy.ndarray of the zscore of each excitatory neuron at each frame of the recording (nb neurons * nb frames) zscore_inh : numpy.ndarray of float A 2D numpy.ndarray of the zscore of each inhibitory neuron at each frame of the recording (nb neurons * nb frames) """ def __init__(self, input_path, starting_trial, rois_path, tuple_mesc=None, mean_f=False, correction=False, cache=True, iti="ITI2", habituation=False): """ Parameters ---------- input_path : str Path to the folder that contains the input files. starting_trial : int The starting trial number. rois_path: str Path to the Excel ROI file. tuple_mesc : tuple, optional Tuple representing the measurements per second (mesc) values for extraction, default is (0, 0) -> to take the session number from the folder, if cha. correction : bool, optional Flag indicating whether to apply correction, default is False. cache: bool Boolean value indicating whether to use cached files if available. analog_sf : int, optional Sampling frequency of the analog data, default is 10000. """ detection = not habituation super().__init__(input_path, rois_path, mean_f, cache=cache, detection=detection) self.xls = pd.read_excel(input_path + 'bpod.xls', header=None) self.stim_time = [] self.stim_ampl = [] self.stim_durations = [] self.reward_time = [] self.timeout_time = [] self.lick_time = [] self.detected_stim = [] self.mlr_labels_exc = {} self.mlr_labels_inh = {} session_mesc = tuple_mesc if tuple_mesc is not None else (int(self.session_id[0]), int(self.session_id[1])) with open(input_path + 'params_trial.json', "r") as read_file: # a list of dictionaries, 1 per trial (nb, freq, amp, trial_type) self.json = json.load(read_file) if os.path.exists(input_path + 'behavior_events.json') and cache: print(f"{self.filename}: Behavioural information already incorporated in the analog.") with open(input_path + 'behavior_events.json', "r") as events_file: events = json.load(events_file) self.stim_time = np.array(events["stim_time"]) self.stim_ampl = np.array(events["stim_ampl"]) self.reward_time = np.array(events["reward_time"]) self.timeout_time = np.array(events["timeout_time"]) self.detected_stim = np.array(events["detected_stim"]) self.stim_durations = np.array(events["stim_durations"]) self.lick_time = np.array(events["lick_time"]) else: if not os.path.exists(input_path + 'analog.txt'): mesc_file = [file for file in os.listdir(input_path) if file.endswith(".mesc")] if mesc_file: extract_analog_from_mesc(input_path + mesc_file[0], session_mesc, self.sf, savepath=input_path) else: print(f"{self.filename}: No analog.txt either mesc file in the folder!") return self.analog = pd.read_csv(input_path + 'analog.txt', sep="\t", header=None) if (len(self.analog[0])/10000) > 600: self.analog[0] = (self.analog[0] * 10).astype(int) analog_sf = 10000 else: self.analog[0] = (self.analog[0]).astype(int) analog_sf = 1000 iti_dict = {True: "ITI1", False: "ITI2", "ITI": "ITI"} iti = iti_dict[self.iti1_only] print(f"{self.filename}: {iti}") self.synchronization_with_iti(starting_trial, analog_sf, correction, iti, habituation) self.zscore_exc = self.zscore(self.df_f_exc) self.zscore_inh = self.zscore(self.df_f_inh) # removing timeout_times that comes after the end of the neuronal record self.timeout_time = np.array([i for i in self.timeout_time if i <= self.zscore_exc.shape[1]]) if max(self.stim_durations) == 0: self.stim_durations = np.full_like(self.stim_time, 15, dtype=int)
[docs] def synchronization_with_iti(self, starting_trial, analog_s, correction, iti, habituation): """ Update the analog file with information on stimulus time, reward time and timeout time Note ------- Get the ITI2, stimulus and reward time from the Excel Synchronize the ITI2 from the Excel with the ITI from the analog Parameters ------- excel file, analog file, starting trial for the recording (relative to the behavioral trials) analog_s analog sampling frequency starting_trial Output ------ Updated analog file (csv) with the stimulus, timeout, reward time """ print(f"{self.filename}: Synchronization method with ITI.") # get the ITI2, reward, timeout and stimulus from the xls if iti == "ITI2": data_iti = self.xls[self.xls[0] == 'STATE'][self.xls[4] == 'ITI2'] else: data_iti = self.xls[self.xls[0] == 'STATE'][self.xls[4] == 'ITI'] data_reward = self.xls[self.xls[0] == 'STATE'][self.xls[4] == 'Reward'] data_timeout = self.xls[self.xls[0] == 'STATE'][self.xls[4] == 'Timeout'] data_stimulus = self.xls[self.xls[0] == 'STATE'][self.xls[4] == 'Stimulus'] splitted = np.split(self.xls, self.xls[self.xls[4] == 'The trial ended'].index) if iti == "ITI2": licks = [np.round(((split[2][(split[0] == 'EVENT') & (split[4] == 94)].values - 2) * analog_s).astype(float)) for split in splitted] else: licks = [np.round(((split[2][(split[0] == 'EVENT') & (split[4] == 94)].values ) * analog_s).astype(float)) for split in splitted] # get the time for ITI2, reward, timeout and stimulus ITI_time_xls = data_iti[2].values.astype(float) reward_time = data_reward[2].values.astype(float) timeout_time = data_timeout[2].values.astype(float) stimulus_time = data_stimulus[2].values.astype(float) # calculate the time of reward and timeout relative to ITI2 if iti == "ITI2": reward_time_to_ITI = np.around((reward_time - 2) * analog_s).tolist() # in ms timeout_time_to_ITI = np.around((timeout_time - 2) * analog_s).tolist() # in ms stimulus_time_to_ITI = np.around((stimulus_time - 2) * analog_s).tolist() # in ms else: reward_time_to_ITI = np.around((reward_time ) * analog_s).tolist() # in ms timeout_time_to_ITI = np.around((timeout_time ) * analog_s).tolist() # in ms stimulus_time_to_ITI = np.around((stimulus_time ) * analog_s).tolist() # in ms self.analog.columns = ['t', 'stimulus', 't_iti', 'iti'] # Get the ITI2 from the analog file, as the first "1" value in the digital input of the analog file index_iti_final = [] index_iti_analog = self.analog.index[self.analog['iti'] == 1].tolist() for index, elem in enumerate(index_iti_analog): if index == 0: index_iti_final.append(elem) elif elem - index_iti_analog[index - 1] > 1: index_iti_final.append(elem) # index_iti_final.append(index_iti_analog[-1]) # correction of temporal shift due to instability of frame rate # if correction: # timestamps = np.load(self.input_path + "timestamps_frames.npy") # get the info for the recorded trials and align it with the excel reward_to_analog = [] timeout_to_analog = [] stimulus_to_analog = [] ampl_recording = [] licks_to_analog = [] end_protocol = len(index_iti_final) + starting_trial # Get lists for the reward, timeout and stimulus from the xls calculations for the trials that were recorded for icount in range(starting_trial, len(ITI_time_xls)): if icount < end_protocol: if not habituation: # TODO here reward_to_analog.append(reward_time_to_ITI[icount]) timeout_to_analog.append(timeout_time_to_ITI[icount]) stimulus_to_analog.append(stimulus_time_to_ITI[icount]) licks_to_analog.append(licks[icount]) ampl_recording.append(self.json[icount]["amp"]) ampl_recording_iter = iter(ampl_recording) for icount_time in range(len(index_iti_final)): ITI_time_analog = self.analog.at[index_iti_final[icount_time], 't'] if not habituation: #TODO here reward_time_analog = ITI_time_analog + reward_to_analog[icount_time] timeout_time_analog = ITI_time_analog + timeout_to_analog[icount_time] stimulus_time_analog = ITI_time_analog + stimulus_to_analog[icount_time] licks_time_analog = ITI_time_analog + licks_to_analog[icount_time] if not habituation: # TODO here index_reward = self.analog.index[self.analog['t'] == reward_time_analog].to_list() index_timeout = self.analog.index[self.analog['t'] == timeout_time_analog].to_list() index_stimulus = self.analog.index[self.analog['t'] == stimulus_time_analog].to_list() for lick in licks_time_analog: index_licks = self.analog.index[self.analog['t'] == lick].to_list() if len(index_licks) != 0: lick_time = int((index_licks[0] / analog_s) * self.sf) if correction: lick_time = lick_time - int(((1 / self.sf) * (index_licks[0] / analog_s)) * (1 / 3)) # lick_time = get_idx_frame_mesc(index_licks[0], timestamps) self.lick_time.append(lick_time) if not habituation and len(index_reward) != 0: #TODO here index_rew= int((index_reward[0] / analog_s) * self.sf) if correction: self.reward_time.append( index_rew - int(((1 / self.sf) * (index_reward[0] / analog_s)) * (1 / 3))) else: self.reward_time.append(index_rew) if not habituation and len(index_timeout) != 0: #TODO here if len(index_stimulus) == 0: timeout_trial = next(ampl_recording_iter) index_timeo = int((index_timeout[0] / analog_s) * self.sf) if correction: self.timeout_time.append(index_timeo - int(((1 / self.sf) * (index_timeout[0] / analog_s)) * (1 / 3))) else: self.timeout_time.append(index_timeo) if len(index_stimulus) != 0: amp = next(ampl_recording_iter) index_stim = int((index_stimulus[0] / analog_s) * self.sf) if correction: self.stim_time.append(index_stim - int(((1 / self.sf) * (index_stimulus[0] / analog_s)) * (1 / 3))) # self.stim_time.append(get_idx_frame_mesc(index_stimulus[0], timestamps)) else: self.stim_time.append(index_stim) self.stim_ampl.append(amp) if not habituation and len(index_reward) != 0: #TODO here self.detected_stim.append(True) else: self.detected_stim.append(False) stim_ampl = np.around(self.stim_ampl, decimals=1) stim_ampl_sort = np.sort(np.unique(stim_ampl)) convert = {1: [14], 4: [4, 6, 8, 10], 5: [4, 6, 8, 10, 12], 6: [2, 4, 6, 8, 10, 12], 7: [0, 2, 4, 6, 8, 10, 12]} for i in range(len(stim_ampl_sort)): stim_ampl[stim_ampl == stim_ampl_sort[i]] = convert[len(stim_ampl_sort)][i] self.stim_ampl = stim_ampl self.stim_time = np.array(self.stim_time) self.reward_time = np.array(self.reward_time) self.timeout_time = np.array(self.timeout_time) self.detected_stim = np.array(self.detected_stim) self.lick_time = np.array(self.lick_time) # stim duration extraction durations = np.zeros(len(self.stim_time)) if not habituation: for i, stim_t in enumerate(self.stim_time): diff_ar = np.absolute(self.reward_time - stim_t) if diff_ar[diff_ar.argmin()] >= int(0.5 * self.sf) - 1: durations[i] = int(0.5*self.sf) else: durations[i] = diff_ar[diff_ar.argmin()] self.stim_durations = durations # self.stim_time = self.stim_time[self.stim_time < len(self.df_f_exc[0])] # self.stim_ampl = self.stim_ampl[:len(self.stim_time )] # self.reward_time = self.reward_time[:len(self.stim_time)] # self.stim_durations = self.stim_durations[:len(self.stim_time)] to_save = {"stim_time": self.stim_time.tolist(), "stim_ampl": stim_ampl.tolist(), "stim_durations": self.stim_durations.tolist(), "reward_time": self.reward_time.tolist(), "timeout_time": self.timeout_time.tolist(), "detected_stim": self.detected_stim.tolist(), "lick_time": self.lick_time.tolist() } with open(self.input_path + "behavior_events.json", "w") as jsn: json.dump(to_save, jsn)
def mlr(self, mlr_model, name_model): if os.path.exists(self.input_path + name_model + '.json'): print('MLR model already computed') with open(self.input_path + name_model + '.json', "r") as events_file: events = json.load(events_file) self.mlr_labels_exc = events["exc"] self.mlr_labels_inh = events["inh"] else: self.mlr_labels_exc["text_labels"], self.mlr_labels_exc["n_neurons_per_label"], self.mlr_labels_exc[ "neuron_labels"], self.mlr_labels_exc["indices_r2"] = mlr(self.zscore_exc, mlr_model, self.sf) self.mlr_labels_inh["text_labels"], self.mlr_labels_inh["n_neurons_per_label"], self.mlr_labels_inh[ "neuron_labels"], self.mlr_labels_inh["indices_r2"] = mlr(self.zscore_inh, mlr_model, self.sf) def convert_to_list(obj): return {key: value.tolist() for key, value in obj.items()} exc_list = convert_to_list(self.mlr_labels_exc) inh_list = convert_to_list(self.mlr_labels_inh) to_save_list = {"exc": exc_list, "inh": inh_list} with open(self.input_path + name_model + ".json", "w") as jsn: json.dump(to_save_list, jsn) def stim_ampl_filter(self, stim_ampl="all", include_no_go=False, return_amps=False): """ Returns a vector of booleans to select the stimulations of desired amplitude. Parameters ---------- stim_ampl : str or list[int] The amplitudes of stimulation that we want to select. List of absolute values or relative to threshold (threshold, supra, sub, supra_wt_threshold, sub_wt_threshold or all) include_no_go : bool (optional, default = False) Whether to include no-go trials (amplitude 0) or not. Returns ------- numpy.ndarray[bool] A vector of booleans to select the stimulations of desired amplitude. """ if include_no_go: all_ampl = np.arange(0, 14, 2) else: all_ampl = np.arange(2, 14, 2) if stim_ampl == "threshold": amplitudes = self.threshold elif stim_ampl == "session_threshold": amplitudes = self.session_threshold elif stim_ampl == "sub_amp": amplitudes = self.session_threshold - 2 if self.session_threshold != 2 else None elif stim_ampl == "supra_amp": amplitudes = self.session_threshold + 2 if self.session_threshold != 12 else None elif stim_ampl == "supra": amplitudes = all_ampl[all_ampl >= self.threshold] elif stim_ampl == "sub": amplitudes = all_ampl[all_ampl < self.threshold] elif stim_ampl == "all": amplitudes = all_ampl elif stim_ampl == "supra_wt_threshold": amplitudes = all_ampl[all_ampl > 6] elif stim_ampl == "sub_wt_threshold": amplitudes = all_ampl[all_ampl < 6] else: amplitudes = np.array(stim_ampl) selected_stim = np.isin(self.stim_ampl, amplitudes) return amplitudes if return_amps else selected_stim def timeout_filter(self, no_lick_period_duration): mask = [] for time in self.timeout_time: pre_time = [time - (i + 1) for i in range(int(no_lick_period_duration * self.sf))] mask.append(not any(np.isin(pre_time, self.lick_time))) return mask def get_estimated_activity(self, zscore=True, n_type="EXC", estimator="mean", real_duration=True): assert n_type in ["EXC", "INH"], "Please provide a valid neuron type (EXC or INH)" if zscore: activity = self.zscore_exc if n_type == "EXC" else self.zscore_inh else: activity = self.df_f_exc if n_type == "EXC" else self.df_f_inh est_act_mat = np.zeros_like(self.matrices[n_type]["Responsivity"], dtype=float) for n_id, neuron in enumerate(activity): for trial_id, (trial_start, trial_duration) in enumerate(zip(self.stim_time, self.stim_durations)): if not real_duration: trial_duration = 15 data = neuron[trial_start : trial_start + int(trial_duration)] est_act_mat[n_id, trial_id] = data.mean() if estimator == "mean" else (data.std() if estimator == "std" else np.nan) return est_act_mat def reliable_responders(self): """Defines which neurons are considered as reliable responders. Here we choose to consider every neuron that is recruited in more than 5 detected trials.""" resp_exc = self.matrices['EXC']["Responsivity"][:, self.detected_stim] resp_inh = self.matrices['INH']["Responsivity"][:, self.detected_stim] self.reliable_exc = np.argwhere(np.count_nonzero(resp_exc != 0, axis=1) > 5).flatten() self.reliable_inh = np.argwhere(np.count_nonzero(resp_inh != 0, axis=1) > 5).flatten() self.reliable_act_exc = np.argwhere(np.count_nonzero(resp_exc == 1, axis=1) > 5).flatten() self.reliable_inh_exc = np.argwhere(np.count_nonzero(resp_exc == -1, axis=1) > 5).flatten() self.reliable_act_inh = np.argwhere(np.count_nonzero(resp_inh == 1, axis=1) > 5).flatten() self.reliable_inh_inh = np.argwhere(np.count_nonzero(resp_inh == -1, axis=1) > 5).flatten()
if __name__ == '__main__': import percephone.plts.heatmap as hm from percephone.analysis.utils import corrected_prestim_windows # directory = "/datas/Théo/Projects/Percephone/data/Amplitude_Detection/Amplitude_Detection_DMSO_BMS/" # roi_info = directory + "Fmko_bms&dmso_info.xlsx" # folder = "20231107_5893_00_BMS_synchro" # path_to_mesc = directory + "20231107_5893_BMS_det.mesc" # # folder = "20231107_5886_00_DMSO_synchro" # # path_to_mesc = directory + "20231107_5886_DMSO_det.mesc" # analog = pd.read_csv(directory + folder + "/"+"analog.txt", sep="\t", header=None) # cut_idx = np.argwhere(np.diff(analog.iloc[:, 3]) > 0)[-1] # analog_cropped = np.array(analog.iloc[:cut_idx[0]]) # np.savetxt(directory + folder + "/"+"analog.txt", analog_cropped, fmt='%.8g', delimiter="\t") # plt.plot(analog_cropped.iloc[:, 3]) # extract_analog_from_mesc(path_to_mesc, (0, 0), 30.9609, 20000, directory + folder + "/") # rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=False, correction=True) # rec.stim_time = corrected_prestim_windows(rec) # hm.interactive_heatmap(rec, rec.zscore_exc) # from percephone.plts.heatmap import ordered_heatmap, get_zscore # zsc, t_stim = get_zscore(rec, exc_neurons=True, sort=True, amp_sort=True) # ordered_heatmap(rec, exc_neurons=True, inh_neurons=False, time_span="pre_stim", det_sorted=True, amp_sorted=True) # # directory ="/datas/Théo/Projects/Percephone/data/Amplitude_Detection/loop_format_tau_02/" # roi_info = directory + "/FmKO_ROIs&inhibitory.xlsx" # folder = "20231008_5890_03_synchro" # rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=True, correction=False) # rec.stim_time = corrected_prestim_windows(rec) # # hm.interactive_heatmap(rec, rec.zscore_inh) from percephone.plts.heatmap import ordered_heatmap, get_zscore # zsc, t_stim = get_zscore(rec, exc_neurons=True, sort=True, amp_sort=True) # ordered_heatmap(rec, exc_neurons=False, inh_neurons=True, det_sorted=True, amp_sorted=True) # ordered_heatmap(rec, exc_neurons=False, inh_neurons=True, time_span="pre_stim", det_sorted=True, amp_sorted=True) #06-12-2024 tst ourania new recordings # directory = "/datas/Théo/Projects/Percephone/data/Amplitude_Detection/testing_activity_DMSO/" # roi_info = directory + "FmKO_ROIs&inhibitory.xlsx" # folder = "20241217_7539_00_synchro" # rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=False, correction=True, iti="ITI2") # hm.interactive_heatmap(rec, rec.zscore_exc) # ordered_heatmap(rec, exc_neurons=True, inh_neurons=False, time_span="stim", det_sorted=True, amp_sorted=True) # first batch recording # directory = "/datas/Théo/Projects/Percephone/data/Amplitude_Detection/loop_format_tau_02/" # roi_info = directory + "FmKO_ROIs&inhibitory.xlsx" # folder = "20220715_4456_00_synchro" # rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=False, correction=True, iti="ITI2") # hm.interactive_heatmap(rec, rec.zscore_exc) # ordered_heatmap(rec, exc_neurons=True, inh_neurons=False, time_span="stim", det_sorted=True, amp_sorted=True) #10-12-2024 # directory = "/datas/Théo/Projects/Percephone/data/Amplitude_Detection/testing_activity_DMSO/" # roi_info = directory + "FmKO_ROIs&inhibitory.xlsx" # folder = "20241211_7539_02_synchro" # extract_analog_from_mesc(directory + folder + "/20241211_DMSO_7539.mesc", (0, 2), 30.9609, 20000, directory + folder + "/") # rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=False, correction=False) # hm.interactive_heatmap(rec, rec.zscore_exc) # ordered_heatmap(rec, exc_neurons=True, inh_neurons=False, time_span="pre_stim", det_sorted=True, amp_sorted=True) #Test new recordings 1 amp 07-01-2025 directory = "Z:/Current_members/Ourania_Semelidou/2p/Ca_imaging_analysis_PreSynchro/Fmko/Habituation/" roi_info = directory + "FmKO_ROIs&inhibitory.xlsx" folder = "20250103_7553_00_synchro" rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=False, correction=True, iti="ITI2", habituation=True) hm.interactive_heatmap(rec, rec.zscore_exc) #Test old recording # directory = "C:/Users/cvandromme/Desktop/Data/" # roi_info = "C:/Users/cvandromme/Desktop/FmKO_ROIs&inhibitory.xlsx" # folder = "20220710_4445_00_synchro" # rec = RecordingAmplDet(directory + folder + "/", 0, roi_info, cache=False, correction=True, iti="ITI2", # habituation=False) # hm.interactive_heatmap(rec, rec.zscore_exc)