import numpy as np
import pandas as pd
import mne
import copy
from typing import List
from meg_qc.plotting.universal_plots import QC_derivative, assign_epoched_std_ptp_to_channels
from meg_qc.plotting.universal_html_report import simple_metric_basic
from meg_qc.calculation.initial_meg_qc import (chs_dict_to_csv,load_data)
[docs]def get_std_all_data(data: mne.io.Raw, channels: List):
"""
Calculate std for each channel - for the entire time duration.
Parameters
----------
data : mne.io.Raw
raw data
channels : List
list of channel names
Returns
-------
std_channels_named : dict
dictionary with channel names and their std values
"""
data_channels = data.get_data(picks=channels)
std_channels = np.std(data_channels, axis=1)
# Add channel name for every std value:
std_channels_named = {ch: std for ch, std in zip(channels, std_channels)}
return std_channels_named
[docs]def get_big_small_std_ptp_all_data(ptp_or_std_channels_named: dict, channels: List, std_multiplier: float):
"""
Function calculates peak-to-peak amplitude or STDs over the entire data set for each channel.
Threshold for noisy = mean + multiplier*std, above it - noisy,
Threshold for flat = mean - multiplier*std, below it - flat,
where:
- mean is mean stds/ptp values over all channels
- std is standard deviation of stds values over all channels
- multiplier is a parameter set in config, defines how many stds/ptp above or below mean should be taken as threshold.
Parameters
----------
ptp_or_std_channels_named : dict
peak-to-peak amplitude or std for each channel
channels : List
list of channel names
std_multiplier : float
multipliar for std, used to define thresholds for noisy and flat channels
Returns
-------
noisy_channels : dict
dictionary with channel names and their stds/ptp values. Noisy channels.
flat_channels : dict
dictionary with channel names and their stds/ptp values. Flat channels.
"""
# Put all values in 1 array from the dictionsry:
ptp_or_std_channels = np.array(list(ptp_or_std_channels_named.values()))
## Check if channel data is within std level of PtP/std.
std_of_measure_channels=np.std(ptp_or_std_channels)
mean_of_measure_channels=np.mean(ptp_or_std_channels)
print('___MEGqc___: ', mean_of_measure_channels + std_multiplier*std_of_measure_channels, ' threshold for NOISY. ')
print('___MEGqc___: ', mean_of_measure_channels - std_multiplier*std_of_measure_channels, ' threshold for FLAT. ')
# Find the index of channels with biggest and smallest std:
ch_ind_big_measure = [index for (index, item) in enumerate(ptp_or_std_channels) if item > mean_of_measure_channels + std_multiplier*std_of_measure_channels] #find index with bigst std
ch_ind_small_measure = [index for (index, item) in enumerate(ptp_or_std_channels) if item < mean_of_measure_channels - std_multiplier*std_of_measure_channels] #find index with smallest std
#make dictionaries with channel names and their std values:
noisy_channels = {}
flat_channels = {}
for index in ch_ind_big_measure:
ch_name = np.array(channels)[index]
noisy_channels[ch_name] = ptp_or_std_channels[index]
for index in ch_ind_small_measure:
ch_name = np.array(channels)[index]
flat_channels[ch_name] = ptp_or_std_channels[index]
return noisy_channels, flat_channels
#%%
[docs]def get_std_epochs(channels: List, epochs_mg: mne.Epochs):
"""
Calculate std for multiple epochs for a list of channels.
Used as internal function in std_meg_epoch()
Parameters
----------
channels : List
list of channel names
epochs_mg : mne.Epochs
epochs data as mne.Epochs object
Returns
-------
pd.DataFrame
dataframe with std values for each channel and each epoch
"""
# Get the data of all epochs and channels at once
data_epochs = epochs_mg.get_data(picks=channels) # shape: (n_epochs, n_channels, n_times)
# Compute the standard deviation along the time axis (axis=2) for each channel in every epoch
std_array = np.std(data_epochs, axis=2) # shape: (n_epochs, n_channels)
# Transpose so that rows represent channels and columns represent epochs, matching the original version
return pd.DataFrame(std_array.T, index=channels)
[docs]def get_noisy_flat_std_ptp_epochs(df_std: pd.DataFrame, ch_type: str, std_or_ptp: str, noisy_channel_multiplier: float, flat_multiplier: float, percent_noisy_flat_allowed: float):
"""
1. Define if the channels data inside the epoch is noisy or flat:
Compare the std of this channel for this epoch (df_std) to the mean STD of this particular channel or over all epchs.
- If std of this channel for this epoch is over the mean std of this channel for all epochs together * flat multiplyer, then the data for this channel in this epoch is noisy.
- If std of this channel for this epoch is under the mean std of this channel for all epochs together * noisy multiplyer, then this the data for this channel in this epoch is flat.
Multiplyer is set by user in the config file.
2. Count how many channels are noisy/flat in each epoch.
If more than percent_noisy_flat_allowed of channels are noisy/flat, then this epoch is noisy/flat.
Percent is set by user in the config file.
3. Create MEG_QC_derivative as 3 dfs:
- df_epoch_vs_mean: ratio of std of this channel for this epoch to the mean std of this channel over all epochs together
- df_noisy_epoch: df with True/False values for each channel in each epoch, True if this channel is noisy in this epoch
- df_flat_epoch: df with True/False values for each channel in each epoch, True if this channel is flat in this epoch
Parameters
----------
df_std : pd.DataFrame
dataframe with std/ptp values for each channel and each epoch
ch_type : str
channel type, 'mag', 'grad'
std_or_ptp : str
'std' or 'ptp' - to use std or peak to peak amplitude as a metric
noisy_channel_multiplier : float
multiplier to define noisy channel, if std of this channel for this epoch is over (the mean std of this channel for all epochs together*multipliar), then this channel is noisy
set by user in the config file
flat_multiplier : float
multiplier to define flat channel, if std of this channel for this epoch is under (the mean std of this channel for all epochs together*multipliar), then this channel is flat
set by user in the config file
percent_noisy_flat_allowed : float
percent of noisy/flat channels allowed in each epoch, if more than this percent, then this epoch is noisy/flat. Example: 70
Means that if more than 70% of channels are noisy/flat in this epoch, then this epoch is noisy/flat.
Returns
-------
list
list of 3 MEG_QC_derivative objects:
- df_epoch_vs_mean: ratio of std of this channel for this epoch to the mean std of this channel over all epochs together
- df_noisy_epoch: df with True/False values for each channel in each epoch, True if this channel is noisy in this epoch
- df_flat_epoch: df with True/False values for each channel in each epoch, True if this channel is flat in this epoch
"""
epochs = [int(ep) for ep in df_std.columns.tolist()] #get epoch numbers
# Save true channel count BEFORE adding summary rows so the denominator
# in percentage / threshold calculations is always correct.
n_channels = len(df_std.index)
# Make a separate DataFrame and calculate the mean of stds for each channel over all epochs together
df_std_with_mean = df_std.copy()
df_std_with_mean['mean'] = df_std_with_mean.mean(axis=1)
# Avoid division by zero when the channel is constant by treating the mean
# of zero as missing data. Subsequent comparisons will skip these entries.
mean_series = df_std_with_mean['mean'].replace(0, np.nan)
#TODO: check mwhy all mean tuen into 1 in tsv???
# Compare mean std of each channel to std of this channel for every epoch and convert to boolean type
df_noisy_epoch = df_std_with_mean.copy().astype(bool)
df_flat_epoch = df_std_with_mean.copy().astype(bool)
df_epoch_vs_mean = df_std_with_mean.copy()
# Now see which channles in epoch are over std_level or under -std_level:
#append raws to df_noisy_epoch to hold the % of noisy/flat channels in each epoch:
# NOTE: use the SAME label string here as is written in the loop and read in
# make_dict_local_std_ptp, otherwise pandas creates a duplicate "ghost" row
# which inflates len(df) and corrupts iloc[:-3] slicing.
df_noisy_epoch.loc['number noisy channels'] = np.nan
df_noisy_epoch.loc['% noisy channels'] = np.nan
df_noisy_epoch.loc['noisy < %s perc' % percent_noisy_flat_allowed] = np.nan
df_flat_epoch.loc['number flat channels'] = np.nan
df_flat_epoch.loc['% flat channels'] = np.nan
df_flat_epoch.loc['flat < %s perc' % percent_noisy_flat_allowed] = np.nan
for ep in epochs:
ratio = df_std_with_mean.iloc[:, ep] / mean_series
# ratio is NaN for channels with zero mean; these are marked not noisy/flat
# by replacing NaN with False in the boolean comparison results
df_epoch_vs_mean.iloc[:, ep] = ratio
# Align to original channel index to avoid broadcasting issues
noisy_mask = (ratio > noisy_channel_multiplier).fillna(False)
flat_mask = (ratio < flat_multiplier).fillna(False)
df_noisy_epoch.loc[df_std.index, ep] = noisy_mask.values
df_flat_epoch.loc[df_std.index, ep] = flat_mask.values
# Calculate the number of noisy/flat channels in this epoch:
df_noisy_epoch.loc['number noisy channels', ep] = df_noisy_epoch.iloc[:-3,ep].sum()
df_flat_epoch.loc['number flat channels', ep] = df_flat_epoch.iloc[:-3,ep].sum()
# Calculate percent of noisy/flat channels in this epoch.
# Use n_channels (saved before summary rows were added) as the denominator
# so that the 3 summary rows are never counted in the percentage.
df_noisy_epoch.loc['% noisy channels', ep] = round(df_noisy_epoch.iloc[:-3,ep].sum() / n_channels * 100, 1)
df_flat_epoch.loc['% flat channels', ep] = round(df_flat_epoch.iloc[:-3,ep].sum() / n_channels * 100, 1)
# Now check if the epoch has over percent_noisy_flat_allowed% of noisy/flat channels:
df_noisy_epoch.loc['noisy < %s perc' % percent_noisy_flat_allowed, ep] = df_noisy_epoch.iloc[:-3,ep].sum() > n_channels * percent_noisy_flat_allowed / 100
df_flat_epoch.loc['flat < %s perc' % percent_noisy_flat_allowed, ep] = df_flat_epoch.iloc[:-3,ep].sum() > n_channels * percent_noisy_flat_allowed / 100
# Create derivatives:
noisy_flat_epochs_derivs = [
QC_derivative(df_epoch_vs_mean, std_or_ptp+'_per_epoch_vs_mean_ratio_'+ch_type, 'df'),
QC_derivative(df_noisy_epoch, 'Noisy_epochs_on_'+std_or_ptp+'_base_'+ch_type, 'df'),
QC_derivative(df_flat_epoch, 'Flat_epochs_on_'+std_or_ptp+'_base_'+ch_type, 'df')]
return noisy_flat_epochs_derivs
#%% All about simple metric jsons:
[docs]def make_dict_global_std_ptp(std_ptp_params: dict, big_std_with_value_all_data: List[dict], small_std_with_value_all_data: List[dict], channels: List[str], std_or_ptp: str):
"""Make a dictionary with global metric content for std or ptp metric.
Global means that it is calculated over entire data series, not over epochs.
Parameters
----------
std_ptp_params : dict
dictionary with parameters for std or ptp metric
big_std_with_value_all_data : List
list of dictionaries (channel_name: value) for channels with big std or ptp
small_std_with_value_all_data : List
list of dictionaries (channel_name: value) for channels with small std or ptp
channels : List
list of channel names
std_or_ptp : str
'std' or 'ptp': use STD or Peak-to-peak metric
Returns
-------
metric_global_content : dict
dictionary with global metric content for std or ptp metric
"""
global_details = {
'noisy_ch': big_std_with_value_all_data,
'flat_ch': small_std_with_value_all_data}
metric_global_content = {
'number_of_noisy_ch': len(big_std_with_value_all_data),
'percent_of_noisy_ch': round(len(big_std_with_value_all_data)/len(channels)*100, 1),
'number_of_flat_ch': len(small_std_with_value_all_data),
'percent_of_flat_ch': round(len(small_std_with_value_all_data)/len(channels)*100, 1),
std_or_ptp+'_lvl': std_ptp_params['std_lvl'],
'details': global_details}
return metric_global_content
[docs]def make_dict_local_std_ptp(std_ptp_params: dict, noisy_epochs_df: pd.DataFrame, flat_epochs_df: pd.DataFrame, percent_noisy_flat_allowed: float=70, onset_times: list=None):
"""
Make a dictionary with local metric content for std or ptp metric.
Local means that it is calculated over epochs.
Parameters
----------
std_ptp_params : dict
dictionary with parameters for std or ptp metric, originally from config file
noisy_epochs_df : pd.DataFrame
dataframe with True/False values for noisy channels in each epoch
flat_epochs_df : pd.DataFrame
dataframe with True/False values for flat channels in each epoch
percent_noisy_flat_allowed : float
percent of noisy/flat channels allowed in each epoch, if more than this percent, then this epoch is noisy/flat. Example: 70
Returns
-------
metric_local_content : dict
dictionary with local metric content for std or ptp metric
"""
epochs = noisy_epochs_df.columns.tolist()
epochs = [int(ep) for ep in epochs[:-1]]
epochs_details = []
for i, ep in enumerate(epochs):
onset_s = round(float(onset_times[i]), 4) if (onset_times is not None and i < len(onset_times)) else None
epochs_details += [{'epoch': ep, 'onset_time_s': onset_s, 'number_of_noisy_ch': int(noisy_epochs_df.loc['number noisy channels',ep]), 'perc_of_noisy_ch': float(noisy_epochs_df.loc['% noisy channels',ep]), 'epoch_too_noisy': noisy_epochs_df.loc['noisy < %s perc' % percent_noisy_flat_allowed, ep], 'number_of_flat_ch': int(flat_epochs_df.loc['number flat channels', ep]), 'perc_of_flat_ch': float(flat_epochs_df.loc['% flat channels', ep]), 'epoch_too_flat': flat_epochs_df.loc['flat < %s perc' % percent_noisy_flat_allowed,ep]}]
total_num_noisy_ep=len([ep for ep in epochs_details if ep['epoch_too_noisy'] is True])
total_perc_noisy_ep=round(total_num_noisy_ep/len(epochs)*100)
total_num_flat_ep=len([ep for ep in epochs_details if ep['epoch_too_flat'] is True])
total_perc_flat_ep=round(total_num_flat_ep/len(epochs)*100)
metric_local_content={
'allow_percent_noisy_flat_epochs': std_ptp_params['allow_percent_noisy_flat_epochs'],
'noisy_channel_multiplier': std_ptp_params['noisy_channel_multiplier'],
'flat_multiplier': std_ptp_params['flat_multiplier'],
'total_num_noisy_ep': total_num_noisy_ep,
'total_perc_noisy_ep': total_perc_noisy_ep,
'total_num_flat_ep': total_num_flat_ep,
'total_perc_flat_ep': total_perc_flat_ep,
'details': epochs_details}
return metric_local_content
[docs]def make_simple_metric_std(std_params: dict, big_std_with_value_all_data: List[dict], small_std_with_value_all_data: List[dict], channels: List[str], deriv_epoch_std: dict, metric_local_present: bool, m_or_g_chosen: List[dict], onset_times: list=None):
"""
Make simple metric for STD.
Parameters
----------
std_params : dict
dictionary with parameters for std metric, originally from config file
big_std_with_value_all_data : List
list of dictionaries (channel_name: value) for channels with big std
small_std_with_value_all_data : List
list of dictionaries (channel_name: value) for channels with small std
channels : List
list of channel names
deriv_epoch_std : dict
dictionary with QC_derivative objects containing data frames.
Used only data frame 1 and 2.
1: contains True/False values for noisy channels in each epoch.
2: contains True/False values for flat channels in each epoch.
metric_local_present : bool
True if local metric was calculated (epochs present). False if not calculated (epochs were not detected).
m_or_g_chosen : List
list of strings with channel types chosen by user: ['mag', 'grad'] or ['mag'] or ['grad']
Returns
-------
dict
dictionary with simple metric for std/ptp
"""
metric_global_name = 'STD_all_time_series'
metric_global_description = 'Standard deviation of the data over the entire time series (not epoched): the number of noisy channels depends on the std of the data over all channels. The std level is set by the user. Noisy channel: The channel where std of data is higher than threshod: mean_over_all_stds_channel + (std_of_all_channels*std_lvl). Flat: where std of data is lower than threshld: mean_over_all_stds_channel - (std_of_all_channels*std_lvl). In details only the noisy/flat channels are listed. Channels with normal std are not listed. If needed to see all channels data - use csv files.'
metric_local_name = 'STD_epoch'
if metric_local_present==True:
metric_local_description = 'Standard deviation of the data over stimulus-based epochs. The epoch is counted as noisy (or flat) if the percentage of noisy (or flat) channels in this epoch is over allow_percent_noisy_flat. this percent is set by user, default=70%. Hense, if no epochs have over 70% of noisy channels - total number of noisy epochs will be 0. Definition of a noisy channel inside of epoch: 1)Take std of data of THIS channel in THIS epoch. 2) Take std of the data of THIS channel for ALL epochs and get mean of it. 3) If (1) is higher than (2)*noisy_channel_multiplier - this channel is noisy. If (1) is lower than (2)*flat_multiplier - this channel is flat.'
else:
metric_local_description = 'Not calculated. No epochs found'
metric_global_content={'mag': None, 'grad': None, 'eeg': None}
metric_local_content={'mag': None, 'grad': None, 'eeg': None}
for m_or_g in m_or_g_chosen:
metric_global_content[m_or_g]=make_dict_global_std_ptp(std_params, big_std_with_value_all_data[m_or_g], small_std_with_value_all_data[m_or_g], channels[m_or_g], 'std')
if metric_local_present is True:
metric_local_content[m_or_g]=make_dict_local_std_ptp(std_params, deriv_epoch_std[m_or_g][1].content, deriv_epoch_std[m_or_g][2].content, percent_noisy_flat_allowed=std_params['allow_percent_noisy_flat_epochs'], onset_times=onset_times)
#deriv_epoch_std[m_or_g][1].content is df with big std(noisy), df_epoch_std[m_or_g][2].content is df with small std(flat)
else:
metric_local_content[m_or_g]=None
simple_metric = simple_metric_basic(metric_global_name, metric_global_description, metric_global_content['mag'], metric_global_content['grad'], metric_local_name, metric_local_description, metric_local_content['mag'], metric_local_content['grad'], metric_global_content_eeg=metric_global_content.get('eeg'), metric_local_content_eeg=metric_local_content.get('eeg'))
return simple_metric
#%%
[docs]def STD_meg_qc(std_params: dict, channels: dict, chs_by_lobe: dict, dict_epochs_mg: dict, data_path:str, m_or_g_chosen: List):
"""
Main STD function. Calculates:
- Std of data for each channel over all time series.
- Channels with big std (noisy) and small std (flat) over all time series.
- Std of data for each channel in each epoch.
- Epochs with big std (noisy) and small std (flat).
Parameters
----------
std_params : dict
dictionary with parameters for std metric, originally from config file
channels : dict
dictionary with channel names for each channel type: channels['mag'] or channels['grad']
chs_by_lobe : dict
dictionary with channels grouped first by ch type and then by lobe: chs_by_lobe['mag']['Left Occipital'] or chs_by_lobe['grad']['Left Occipital']
dict_epochs_mg : dict
dictionary with epochs for each channel type: dict_epochs_mg['mag'] or dict_epochs_mg['grad']
data : mne.io.Raw
raw data
m_or_g_chosen : List
list of strings with channel types chosen by user: ['mag', 'grad'] or ['mag'] or ['grad']
Returns
-------
derivs_std : List
list of QC_derivative objects containing data frames and figures for std metric.
simple_metric_std : dict
dictionary with simple metric for std/ptp.
std_str : str
String with notes about STD for report
"""
# Load data
data, shielding_str, meg_system, _modality = load_data(data_path)
# data.load_data()
big_std_with_value_all_data = {}
small_std_with_value_all_data = {}
std_all_data = {}
derivs_std = []
derivs_list = []
noisy_flat_epochs_derivs={}
chs_by_lobe_std=copy.deepcopy(chs_by_lobe)
# copy here, because want to keep original dict unchanged.
# In principal it s good to collect all data about channel metrics there BUT if the metrics are run in parallel this might produce conflicts
# (without copying dict can be chanaged both inside+outside this function even when it is not returned.)
for m_or_g in m_or_g_chosen:
std_all_data[m_or_g] = get_std_all_data(data, channels[m_or_g])
#Add std data into channel object inside the chs_by_lobe dictionary:
for lobe in chs_by_lobe_std[m_or_g]:
for ch in chs_by_lobe_std[m_or_g][lobe]:
ch.std_overall = std_all_data[m_or_g][ch.name]
#print(ch.__dict__) #will print all the info saved in the object, more than just simply printing the object
big_std_with_value_all_data[m_or_g], small_std_with_value_all_data[m_or_g] = get_big_small_std_ptp_all_data(std_all_data[m_or_g], channels[m_or_g], std_params['std_lvl'])
if dict_epochs_mg['mag'] is not None or dict_epochs_mg['grad'] is not None or dict_epochs_mg.get('eeg') is not None: #If epochs are present
for m_or_g in m_or_g_chosen:
df_std = get_std_epochs(channels[m_or_g], dict_epochs_mg[m_or_g])
chs_by_lobe_std[m_or_g] = assign_epoched_std_ptp_to_channels(what_data='stds', chs_by_lobe=chs_by_lobe_std[m_or_g], df_std_ptp=df_std) #for easier plotting
noisy_flat_epochs_derivs[m_or_g] = get_noisy_flat_std_ptp_epochs(df_std, m_or_g, 'std', std_params['noisy_channel_multiplier'], std_params['flat_multiplier'], std_params['allow_percent_noisy_flat_epochs'])
derivs_list += noisy_flat_epochs_derivs[m_or_g]
metric_local=True
std_str = ''
else:
metric_local=False
std_str = 'STD per epoch can not be calculated because no events are present. Check stimulus channel.'
print('___MEGqc___: ', std_str)
simple_metric_std = make_simple_metric_std(std_params, big_std_with_value_all_data, small_std_with_value_all_data, channels, noisy_flat_epochs_derivs, metric_local, m_or_g_chosen, onset_times=dict_epochs_mg.get('epoch_onset_times_s'))
#Extract chs_by_lobe into a data frame
df_deriv = chs_dict_to_csv(chs_by_lobe_std, file_name_prefix = 'STDs')
derivs_std += derivs_list + df_deriv
#each deriv saved into a separate list and only at the end put together because this way they keep the right order:
#first everything about mags, then everything about grads. - in this ordr they ll be added to repot.
# TODO: Can use fig_order parameter of QC_derivative to adjust figure order in the report, if got nothing better to do XD.
return derivs_std, simple_metric_std, std_str