import os
import mne
import configparser
import numpy as np
import pandas as pd
import random
import copy
import warnings
from typing import List
from meg_qc.calculation.objects import QC_derivative, MEG_channel
[docs]def get_all_config_params(config_file_path: str):
"""
Parse all the parameters from config and put into a python dictionary
divided by sections. Parsing approach can be changed here, which
will not affect working of other fucntions.
Parameters
----------
config_file_path: str
The path to the config file.
Returns
-------
all_qc_params: dict
A dictionary with all the parameters from the config file.
"""
all_qc_params = {}
config = configparser.ConfigParser()
config.read(config_file_path)
default_section = config['DEFAULT']
m_or_g_chosen = default_section['ch_types']
m_or_g_chosen = [chosen.strip() for chosen in m_or_g_chosen.split(",")]
if 'mag' not in m_or_g_chosen and 'grad' not in m_or_g_chosen:
print('___MEGqc___: ', 'No channels to analyze. Check parameter ch_types in config file.')
return None
#TODO: save list of mags and grads here and use later everywhere? because for CTF types are messed up.
run_STD = default_section.getboolean('STD')
run_PSD = default_section.getboolean('PSD')
run_PTP_manual = default_section.getboolean('PTP_manual')
run_PTP_auto_mne = default_section.getboolean('PTP_auto_mne')
run_ECG = default_section.getboolean('ECG')
run_EOG = default_section.getboolean('EOG')
run_Head = default_section.getboolean('Head')
run_Muscle = default_section.getboolean('Muscle')
tmin = default_section['data_crop_tmin']
tmax = default_section['data_crop_tmax']
try:
if not tmin:
tmin = 0
else:
tmin=float(tmin)
if not tmax:
tmax = None
else:
tmax=float(tmax)
default_params = dict({
'm_or_g_chosen': m_or_g_chosen,
'run_STD': run_STD,
'run_PSD': run_PSD,
'run_PTP_manual': run_PTP_manual,
'run_PTP_auto_mne': run_PTP_auto_mne,
'run_ECG': run_ECG,
'run_EOG': run_EOG,
'run_Head': run_Head,
'run_Muscle': run_Muscle,
'plot_mne_butterfly': default_section.getboolean('plot_mne_butterfly'),
'plot_interactive_time_series': default_section.getboolean('plot_interactive_time_series'),
'plot_interactive_time_series_average': default_section.getboolean('plot_interactive_time_series_average'),
'crop_tmin': tmin,
'crop_tmax': tmax})
all_qc_params['default'] = default_params
filtering_section = config['Filtering']
try:
lfreq = filtering_section.getfloat('l_freq')
except:
lfreq = None
try:
hfreq = filtering_section.getfloat('h_freq')
except:
hfreq = None
all_qc_params['Filtering'] = dict({
'apply_filtering': filtering_section.getboolean('apply_filtering'),
'l_freq': lfreq,
'h_freq': hfreq,
'method': filtering_section['method'],
'downsample_to_hz': filtering_section.getint('downsample_to_hz')})
epoching_section = config['Epoching']
stim_channel = epoching_section['stim_channel']
stim_channel = stim_channel.replace(" ", "")
stim_channel = stim_channel.split(",")
if stim_channel==['']:
stim_channel=None
epoching_params = dict({
'event_dur': epoching_section.getfloat('event_dur'),
'epoch_tmin': epoching_section.getfloat('epoch_tmin'),
'epoch_tmax': epoching_section.getfloat('epoch_tmax'),
'stim_channel': stim_channel,
'event_repeated': epoching_section['event_repeated']})
all_qc_params['Epoching'] = epoching_params
std_section = config['STD']
all_qc_params['STD'] = dict({
'std_lvl': std_section.getint('std_lvl'),
'allow_percent_noisy_flat_epochs': std_section.getfloat('allow_percent_noisy_flat_epochs'),
'noisy_channel_multiplier': std_section.getfloat('noisy_channel_multiplier'),
'flat_multiplier': std_section.getfloat('flat_multiplier'),})
psd_section = config['PSD']
freq_min = psd_section['freq_min']
freq_max = psd_section['freq_max']
if not freq_min:
freq_min = 0
else:
freq_min=float(freq_min)
if not freq_max:
freq_max = np.inf
else:
freq_max=float(freq_max)
all_qc_params['PSD'] = dict({
'freq_min': freq_min,
'freq_max': freq_max,
'psd_step_size': psd_section.getfloat('psd_step_size')})
ptp_manual_section = config['PTP_manual']
all_qc_params['PTP_manual'] = dict({
'max_pair_dist_sec': ptp_manual_section.getfloat('max_pair_dist_sec'),
'ptp_thresh_lvl': ptp_manual_section.getfloat('ptp_thresh_lvl'),
'allow_percent_noisy_flat_epochs': ptp_manual_section.getfloat('allow_percent_noisy_flat_epochs'),
'ptp_top_limit': ptp_manual_section.getfloat('ptp_top_limit'),
'ptp_bottom_limit': ptp_manual_section.getfloat('ptp_bottom_limit'),
'std_lvl': ptp_manual_section.getfloat('std_lvl'),
'noisy_channel_multiplier': ptp_manual_section.getfloat('noisy_channel_multiplier'),
'flat_multiplier': ptp_manual_section.getfloat('flat_multiplier')})
ptp_mne_section = config['PTP_auto']
all_qc_params['PTP_auto'] = dict({
'peak_m': ptp_mne_section.getfloat('peak_m'),
'flat_m': ptp_mne_section.getfloat('flat_m'),
'peak_g': ptp_mne_section.getfloat('peak_g'),
'flat_g': ptp_mne_section.getfloat('flat_g'),
'bad_percent': ptp_mne_section.getint('bad_percent'),
'min_duration': ptp_mne_section.getfloat('min_duration')})
ecg_section = config['ECG']
all_qc_params['ECG'] = dict({
'drop_bad_ch': ecg_section.getboolean('drop_bad_ch'),
'n_breaks_bursts_allowed_per_10min': ecg_section.getint('n_breaks_bursts_allowed_per_10min'),
'allowed_range_of_peaks_stds': ecg_section.getfloat('allowed_range_of_peaks_stds'),
'norm_lvl': ecg_section.getfloat('norm_lvl'),
'gaussian_sigma': ecg_section.getint('gaussian_sigma'),
'thresh_lvl_peakfinder': ecg_section.getfloat('thresh_lvl_peakfinder'),
'height_multiplier': ecg_section.getfloat('height_multiplier')})
eog_section = config['EOG']
all_qc_params['EOG'] = dict({
'n_breaks_bursts_allowed_per_10min': eog_section.getint('n_breaks_bursts_allowed_per_10min'),
'allowed_range_of_peaks_stds': eog_section.getfloat('allowed_range_of_peaks_stds'),
'norm_lvl': eog_section.getfloat('norm_lvl'),
'gaussian_sigma': ecg_section.getint('gaussian_sigma'),
'thresh_lvl_peakfinder': eog_section.getfloat('thresh_lvl_peakfinder'),})
head_section = config['Head_movement']
all_qc_params['Head'] = dict({})
muscle_section = config['Muscle']
list_thresholds = muscle_section['threshold_muscle']
#separate values in list_thresholds based on coma, remove spaces and convert them to floats:
list_thresholds = [float(i) for i in list_thresholds.split(',')]
muscle_freqs = [float(i) for i in muscle_section['muscle_freqs'].split(',')]
all_qc_params['Muscle'] = dict({
'threshold_muscle': list_thresholds,
'min_distance_between_different_muscle_events': muscle_section.getfloat('min_distance_between_different_muscle_events'),
'muscle_freqs': muscle_freqs,
'min_length_good': muscle_section.getfloat('min_length_good')})
except:
print('___MEGqc___: ', 'Invalid setting in config file! Please check instructions for each setting. \nGeneral directions: \nDon`t write any parameter as None. Don`t use quotes.\nLeaving blank is only allowed for parameters: \n- stim_channel, \n- data_crop_tmin, data_crop_tmax, \n- freq_min and freq_max in Filtering section, \n- all parameters of Filtering section if apply_filtering is set to False.')
return None
return all_qc_params
[docs]def get_internal_config_params(config_file_name: str):
"""
Parse all the parameters from config and put into a python dictionary
divided by sections. Parsing approach can be changed here, which
will not affect working of other fucntions.
These are interanl parameters, NOT to be changed by the user.
Parameters
----------
config_file_name: str
The name of the config file.
Returns
-------
internal_qc_params: dict
A dictionary with all the parameters.
"""
internal_qc_params = {}
config = configparser.ConfigParser()
config.read(config_file_name)
ecg_section = config['ECG']
internal_qc_params['ECG'] = dict({
'max_n_peaks_allowed_for_ch': ecg_section.getint('max_n_peaks_allowed_for_ch'),
'max_n_peaks_allowed_for_avg': ecg_section.getint('max_n_peaks_allowed_for_avg'),
'ecg_epoch_tmin': ecg_section.getfloat('ecg_epoch_tmin'),
'ecg_epoch_tmax': ecg_section.getfloat('ecg_epoch_tmax'),
'before_t0': ecg_section.getfloat('before_t0'),
'after_t0': ecg_section.getfloat('after_t0'),
'window_size_for_mean_threshold_method': ecg_section.getfloat('window_size_for_mean_threshold_method')})
eog_section = config['EOG']
internal_qc_params['EOG'] = dict({
'max_n_peaks_allowed_for_ch': eog_section.getint('max_n_peaks_allowed_for_ch'),
'max_n_peaks_allowed_for_avg': eog_section.getint('max_n_peaks_allowed_for_avg'),
'eog_epoch_tmin': eog_section.getfloat('eog_epoch_tmin'),
'eog_epoch_tmax': eog_section.getfloat('eog_epoch_tmax'),
'before_t0': eog_section.getfloat('before_t0'),
'after_t0': eog_section.getfloat('after_t0'),
'window_size_for_mean_threshold_method': eog_section.getfloat('window_size_for_mean_threshold_method')})
psd_section = config['PSD']
internal_qc_params['PSD'] = dict({
'method': psd_section.get('method'),
'prominence_lvl_pos_avg': psd_section.getint('prominence_lvl_pos_avg'),
'prominence_lvl_pos_channels': psd_section.getint('prominence_lvl_pos_channels')})
return internal_qc_params
[docs]def stim_data_to_df(raw: mne.io.Raw):
"""
Extract stimulus data from MEG data and put it into a pandas DataFrame.
Parameters
----------
raw : mne.io.Raw
MEG data.
Returns
-------
stim_deriv : list
List with QC_derivative object with stimulus data.
"""
stim_channels = mne.pick_types(raw.info, stim=True)
stim_channel_names = [raw.info['ch_names'][ch] for ch in stim_channels]
# Extract data for stimulus channels
stim_data, times = raw[stim_channels, :]
# Create a DataFrame with the stimulus data
stim_df = pd.DataFrame(stim_data.T, columns=stim_channel_names)
stim_df['time'] = times
#save df as QC_derivative object
stim_deriv = [QC_derivative(stim_df, 'stimulus', 'df')]
return stim_deriv
[docs]def Epoch_meg(epoching_params, data: mne.io.Raw):
"""
Epoch MEG data based on the parameters provided in the config file.
Parameters
----------
epoching_params : dict
Dictionary with parameters for epoching.
data : mne.io.Raw
MEG data to be epoched.
Returns
-------
dict_epochs_mg : dict
Dictionary with epochs for each channel type: mag, grad.
"""
event_dur = epoching_params['event_dur']
epoch_tmin = epoching_params['epoch_tmin']
epoch_tmax = epoching_params['epoch_tmax']
stim_channel = epoching_params['stim_channel']
if stim_channel is None:
picks_stim = mne.pick_types(data.info, stim=True)
stim_channel = []
for ch in picks_stim:
stim_channel.append(data.info['chs'][ch]['ch_name'])
print('___MEGqc___: ', 'Stimulus channels detected:', stim_channel)
picks_magn = data.copy().pick('mag').ch_names if 'mag' in data else None
picks_grad = data.copy().pick('grad').ch_names if 'grad' in data else None
if not stim_channel:
print('___MEGqc___: ', 'No stimulus channel detected. Setting stimulus channel to None to allow mne to detect events autamtically.')
stim_channel = None
#here for info on how None is handled by mne: https://mne.tools/stable/generated/mne.find_events.html
#even if stim is None, mne will check once more when creating events.
epochs_grad, epochs_mag = None, None
try:
events = mne.find_events(data, stim_channel=stim_channel, min_duration=event_dur)
if len(events) < 1:
print('___MEGqc___: ', 'No events with set minimum duration were found using all stimulus channels. No epoching can be done. Try different event duration in config file.')
else:
print('___MEGqc___: ', 'Events found:', len(events))
epochs_mag = mne.Epochs(data, events, picks=picks_magn, tmin=epoch_tmin, tmax=epoch_tmax, preload=True, baseline = None, event_repeated=epoching_params['event_repeated'])
epochs_grad = mne.Epochs(data, events, picks=picks_grad, tmin=epoch_tmin, tmax=epoch_tmax, preload=True, baseline = None, event_repeated=epoching_params['event_repeated'])
except: #case when we use stim_channel=None, mne checks once more, finds no other stim ch and no events and throws error:
print('___MEGqc___: ', 'No stim channels detected, no events found.')
pass #go to returning empty dict
dict_epochs_mg = {
'mag': epochs_mag,
'grad': epochs_grad}
return dict_epochs_mg
[docs]def check_chosen_ch_types(m_or_g_chosen: List, channels_objs: dict):
"""
Check if the channels which the user gave in config file to analize actually present in the data set.
Parameters
----------
m_or_g_chosen : list
List with channel types to analize: mag, grad. These are theones the user chose.
channels_objs : dict
Dictionary with channel names for each channel type: mag, grad. These are the ones present in the data set.
Returns
-------
m_or_g_chosen : list
List with channel types to analize: mag, grad.
m_or_g_skipped_str : str
String with information about which channel types were skipped.
"""
skipped_str = ''
if not any(ch in m_or_g_chosen for ch in ['mag', 'grad']):
skipped_str = "No channels to analyze. Check parameter ch_types in config file."
raise ValueError(skipped_str)
skipped_msgs = {
'mag': "There are no magnetometers in this data set: check parameter ch_types in config file. Analysis will be done only for gradiometers.",
'grad': "There are no gradiometers in this data set: check parameter ch_types in config file. Analysis will be done only for magnetometers."
}
for ch in ['mag', 'grad']:
if len(channels_objs[ch]) == 0 and ch in m_or_g_chosen:
skipped_str = skipped_msgs[ch]
print(f'___MEGqc___: {skipped_str}')
m_or_g_chosen.remove(ch)
if not any(channels_objs[ch] for ch in ['mag', 'grad']):
skipped_str = "There are no magnetometers nor gradiometers in this data set. Analysis will not be done."
raise ValueError(skipped_str)
# Now m_or_g_chosen contain only those channel types which are present in the data set and were chosen by the user.
return m_or_g_chosen, skipped_str
[docs]def choose_channels(raw: mne.io.Raw):
"""
Separate channels by 'mag' and 'grad'.
Done this way, because pick() or pick_types() sometimes gets wrong results, especialy for CTF data.
Parameters
----------
raw : mne.io.Raw
MEG data
Returns
-------
channels : dict
dict with ch names separated by mag and grad
"""
channels={'mag': [], 'grad': []}
# Loop over all channel indexes
for ch_idx, ch_name in enumerate(raw.info['ch_names']):
ch_type = mne.channel_type(raw.info, ch_idx)
if ch_type in channels:
channels[ch_type].append(ch_name)
return channels
[docs]def change_ch_type_CTF(raw, channels):
"""
For CTF data channels types and units need to be chnaged from mag to grad.
Parameters
----------
channels : dict
dict with ch names separated by mag and grad
Returns
-------
channels : dict
dict with ch names separated by mag and grad UPDATED
"""
# Create a copy of the channels['mag'] list to iterate over
mag_channels_copy = channels['mag'][:]
for ch_name in mag_channels_copy:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
raw.set_channel_types({ch_name: 'grad'})
channels['grad'].append(ch_name)
# Remove from mag list
channels['mag'].remove(ch_name)
print('___MEGqc___: Types of channels changed from mag to grad for CTF data.')
return channels, raw
[docs]def load_data(file_path):
"""
Load MEG data from a file. It can be a CTF data or a FIF file.
Parameters
----------
file_path : str
Path to the fif file with MEG data.
Returns
-------
raw : mne.io.Raw
MEG data.
shielding_str : str
String with information about active shielding.
"""
shielding_str = ''
meg_system = None
if os.path.isdir(file_path) and file_path.endswith('.ds'):
# It's a CTF data directory
print("___MEGqc___: ", "Loading CTF data...")
raw = mne.io.read_raw_ctf(file_path, preload=True)
meg_system = 'CTF'
elif os.path.isfile(file_path) and file_path.endswith('.fif'):
# It's a FIF file
meg_system = 'Triux'
print("___MEGqc___: ", "Loading FIF data...")
try:
raw = mne.io.read_raw_fif(file_path, on_split_missing='ignore')
except:
raw = mne.io.read_raw_fif(file_path, allow_maxshield=True, on_split_missing='ignore')
shielding_str=''' <p>This fif file contains Internal Active Shielding data. Quality measurements calculated on this data should not be compared to the measuremnts calculated on the data without active shileding, since in the current case invironmental noise reduction was already partially performed by shileding, which normally should not be done before assesing the quality.</p>'''
else:
raise ValueError("Unsupported file format or file does not exist. The pipeline works with CTF data directories and FIF files.")
return raw, shielding_str, meg_system
[docs]def add_3d_ch_locations(raw, channels_objs):
"""
Add channel locations to the MEG channels objects.
Parameters
----------
raw : mne.io.Raw
MEG data.
channels_objs : dict
Dictionary with MEG channels.
Returns
-------
channels_objs : dict
Dictionary with MEG channels with added locations.
"""
# Create a dictionary to store the channel locations
ch_locs = {ch['ch_name']: ch['loc'][:3] for ch in raw.info['chs']}
#why [:3]? to Get only the x, y, z coordinates (first 3 values), theer are also rotations, etc storred in loc.
# Iterate through the channel names and add the locations
for key, value in channels_objs.items():
for ch in value:
ch.loc = ch_locs[ch.name]
return channels_objs
def add_CTF_lobes(channels_objs):
# Initialize dictionary to store channels by lobe and side
lobes_ctf = {
'Left Frontal': [],
'Right Frontal': [],
'Left Temporal': [],
'Right Temporal': [],
'Left Parietal': [],
'Right Parietal': [],
'Left Occipital': [],
'Right Occipital': [],
'Central': [],
'Reference': [],
'EEG/EOG/ECG': [],
'Extra': [] # Add 'Extra' lobe
}
# Iterate through the channel names and categorize them
for key, value in channels_objs.items():
for ch in value:
categorized = False # Track if the channel is categorized
# Magnetometers (assuming they start with 'M')
#Even though they all have to be grads for CTF!!!
if ch.name.startswith('MLF'): # Left Frontal
lobes_ctf['Left Frontal'].append(ch.name)
categorized = True
elif ch.name.startswith('MRF'): # Right Frontal
lobes_ctf['Right Frontal'].append(ch.name)
categorized = True
elif ch.name.startswith('MLT'): # Left Temporal
lobes_ctf['Left Temporal'].append(ch.name)
categorized = True
elif ch.name.startswith('MRT'): # Right Temporal
lobes_ctf['Right Temporal'].append(ch.name)
categorized = True
elif ch.name.startswith('MLP'): # Left Parietal
lobes_ctf['Left Parietal'].append(ch.name)
categorized = True
elif ch.name.startswith('MRP'): # Right Parietal
lobes_ctf['Right Parietal'].append(ch.name)
categorized = True
elif ch.name.startswith('MLO'): # Left Occipital
lobes_ctf['Left Occipital'].append(ch.name)
categorized = True
elif ch.name.startswith('MRO'): # Right Occipital
lobes_ctf['Right Occipital'].append(ch.name)
categorized = True
elif ch.name.startswith('MLC') or ch.name.startswith('MRC'): # Central (Midline)
lobes_ctf['Central'].append(ch.name)
categorized = True
elif ch.name.startswith('MZ'): # Reference Sensors
lobes_ctf['Reference'].append(ch.name)
categorized = True
elif ch.name in ['Cz', 'Pz', 'ECG', 'VEOG', 'HEOG']: # EEG/EOG/ECG channels
lobes_ctf['EEG/EOG/ECG'].append(ch.name)
categorized = True
# Gradiometers (assuming they have a different prefix or suffix, such as 'G')
elif ch.name.startswith('GLF'): # Left Frontal Gradiometers
lobes_ctf['Left Frontal'].append(ch.name)
categorized = True
elif ch.name.startswith('GRF'): # Right Frontal Gradiometers
lobes_ctf['Right Frontal'].append(ch.name)
categorized = True
elif ch.name.startswith('GLT'): # Left Temporal Gradiometers
lobes_ctf['Left Temporal'].append(ch.name)
categorized = True
elif ch.name.startswith('GRT'): # Right Temporal Gradiometers
lobes_ctf['Right Temporal'].append(ch.name)
categorized = True
elif ch.name.startswith('GLP'): # Left Parietal Gradiometers
lobes_ctf['Left Parietal'].append(ch.name)
categorized = True
elif ch.name.startswith('GRP'): # Right Parietal Gradiometers
lobes_ctf['Right Parietal'].append(ch.name)
categorized = True
elif ch.name.startswith('GLO'): # Left Occipital Gradiometers
lobes_ctf['Left Occipital'].append(ch.name)
categorized = True
elif ch.name.startswith('GRO'): # Right Occipital Gradiometers
lobes_ctf['Right Occipital'].append(ch.name)
categorized = True
elif ch.name.startswith('GLC') or ch.name.startswith('GRC'): # Central (Midline) Gradiometers
lobes_ctf['Central'].append(ch.name)
categorized = True
# If the channel was not categorized, add it to 'Extra'
if not categorized:
lobes_ctf['Extra'].append(ch.name)
lobe_colors = {
'Left Frontal': '#1f77b4',
'Right Frontal': '#ff7f0e',
'Left Temporal': '#2ca02c',
'Right Temporal': '#9467bd',
'Left Parietal': '#e377c2',
'Right Parietal': '#d62728',
'Left Occipital': '#bcbd22',
'Right Occipital': '#17becf',
'Central': '#8c564b',
'Reference': '#7f7f7f',
'EEG/EOG/ECG': '#bcbd22',
'Extra': '#d3d3d3'
}
lobes_color_coding_str = 'Color coding by lobe is applied as per CTF system.'
for key, value in channels_objs.items():
for ch in value:
for lobe in lobes_ctf.keys():
if ch.name in lobes_ctf[lobe]:
ch.lobe = lobe
ch.lobe_color = lobe_colors[lobe]
return channels_objs, lobes_color_coding_str
def add_Triux_lobes(channels_objs):
lobes_treux = {
'Left Frontal': ['MEG0621', 'MEG0622', 'MEG0623', 'MEG0821', 'MEG0822', 'MEG0823', 'MEG0121', 'MEG0122', 'MEG0123', 'MEG0341', 'MEG0342', 'MEG0343', 'MEG0321', 'MEG0322', 'MEG0323', 'MEG0331', 'MEG0332', 'MEG0333', 'MEG0643', 'MEG0642', 'MEG0641', 'MEG0611', 'MEG0612', 'MEG0613', 'MEG0541', 'MEG0542', 'MEG0543', 'MEG0311', 'MEG0312', 'MEG0313', 'MEG0511', 'MEG0512', 'MEG0513', 'MEG0521', 'MEG0522', 'MEG0523', 'MEG0531', 'MEG0532', 'MEG0533'],
'Right Frontal': ['MEG0811', 'MEG0812', 'MEG0813', 'MEG0911', 'MEG0912', 'MEG0913', 'MEG0921', 'MEG0922', 'MEG0923', 'MEG0931', 'MEG0932', 'MEG0933', 'MEG0941', 'MEG0942', 'MEG0943', 'MEG1011', 'MEG1012', 'MEG1013', 'MEG1021', 'MEG1022', 'MEG1023', 'MEG1031', 'MEG1032', 'MEG1033', 'MEG1211', 'MEG1212', 'MEG1213', 'MEG1221', 'MEG1222', 'MEG1223', 'MEG1231', 'MEG1232', 'MEG1233', 'MEG1241', 'MEG1242', 'MEG1243', 'MEG1411', 'MEG1412', 'MEG1413'],
'Left Temporal': ['MEG0111', 'MEG0112', 'MEG0113', 'MEG0131', 'MEG0132', 'MEG0133', 'MEG0141', 'MEG0142', 'MEG0143', 'MEG0211', 'MEG0212', 'MEG0213', 'MEG0221', 'MEG0222', 'MEG0223', 'MEG0231', 'MEG0232', 'MEG0233', 'MEG0241', 'MEG0242', 'MEG0243', 'MEG1511', 'MEG1512', 'MEG1513', 'MEG1521', 'MEG1522', 'MEG1523', 'MEG1531', 'MEG1532', 'MEG1533', 'MEG1541', 'MEG1542', 'MEG1543', 'MEG1611', 'MEG1612', 'MEG1613', 'MEG1621', 'MEG1622', 'MEG1623'],
'Right Temporal': ['MEG1311', 'MEG1312', 'MEG1313', 'MEG1321', 'MEG1322', 'MEG1323', 'MEG1421', 'MEG1422', 'MEG1423', 'MEG1431', 'MEG1432', 'MEG1433', 'MEG1441', 'MEG1442', 'MEG1443', 'MEG1341', 'MEG1342', 'MEG1343', 'MEG1331', 'MEG1332', 'MEG1333', 'MEG2611', 'MEG2612', 'MEG2613', 'MEG2621', 'MEG2622', 'MEG2623', 'MEG2631', 'MEG2632', 'MEG2633', 'MEG2641', 'MEG2642', 'MEG2643', 'MEG2411', 'MEG2412', 'MEG2413', 'MEG2421', 'MEG2422', 'MEG2423'],
'Left Parietal': ['MEG0411', 'MEG0412', 'MEG0413', 'MEG0421', 'MEG0422', 'MEG0423', 'MEG0431', 'MEG0432', 'MEG0433', 'MEG0441', 'MEG0442', 'MEG0443', 'MEG0711', 'MEG0712', 'MEG0713', 'MEG0741', 'MEG0742', 'MEG0743', 'MEG1811', 'MEG1812', 'MEG1813', 'MEG1821', 'MEG1822', 'MEG1823', 'MEG1831', 'MEG1832', 'MEG1833', 'MEG1841', 'MEG1842', 'MEG1843', 'MEG0631', 'MEG0632', 'MEG0633', 'MEG1631', 'MEG1632', 'MEG1633', 'MEG2011', 'MEG2012', 'MEG2013'],
'Right Parietal': ['MEG1041', 'MEG1042', 'MEG1043', 'MEG1111', 'MEG1112', 'MEG1113', 'MEG1121', 'MEG1122', 'MEG1123', 'MEG1131', 'MEG1132', 'MEG1133', 'MEG2233', 'MEG1141', 'MEG1142', 'MEG1143', 'MEG2243', 'MEG0721', 'MEG0722', 'MEG0723', 'MEG0731', 'MEG0732', 'MEG0733', 'MEG2211', 'MEG2212', 'MEG2213', 'MEG2221', 'MEG2222', 'MEG2223', 'MEG2231', 'MEG2232', 'MEG2233', 'MEG2241', 'MEG2242', 'MEG2243', 'MEG2021', 'MEG2022', 'MEG2023', 'MEG2441', 'MEG2442', 'MEG2443'],
'Left Occipital': ['MEG1641', 'MEG1642', 'MEG1643', 'MEG1711', 'MEG1712', 'MEG1713', 'MEG1721', 'MEG1722', 'MEG1723', 'MEG1731', 'MEG1732', 'MEG1733', 'MEG1741', 'MEG1742', 'MEG1743', 'MEG1911', 'MEG1912', 'MEG1913', 'MEG1921', 'MEG1922', 'MEG1923', 'MEG1931', 'MEG1932', 'MEG1933', 'MEG1941', 'MEG1942', 'MEG1943', 'MEG2041', 'MEG2042', 'MEG2043', 'MEG2111', 'MEG2112', 'MEG2113', 'MEG2141', 'MEG2142', 'MEG2143'],
'Right Occipital': ['MEG2031', 'MEG2032', 'MEG2033', 'MEG2121', 'MEG2122', 'MEG2123', 'MEG2311', 'MEG2312', 'MEG2313', 'MEG2321', 'MEG2322', 'MEG2323', 'MEG2331', 'MEG2332', 'MEG2333', 'MEG2341', 'MEG2342', 'MEG2343', 'MEG2511', 'MEG2512', 'MEG2513', 'MEG2521', 'MEG2522', 'MEG2523', 'MEG2531', 'MEG2532', 'MEG2533', 'MEG2541', 'MEG2542', 'MEG2543', 'MEG2431', 'MEG2432', 'MEG2433', 'MEG2131', 'MEG2132', 'MEG2133'],
'Extra': []} # Add 'Extra' lobe
# These were just for Aarons presentation:
# lobes_treux = {
# 'Left Frontal': ['MEG0621', 'MEG0622', 'MEG0623', 'MEG0821', 'MEG0822', 'MEG0823', 'MEG0121', 'MEG0122', 'MEG0123', 'MEG0341', 'MEG0342', 'MEG0343', 'MEG0321', 'MEG0322', 'MEG0323', 'MEG0331', 'MEG0332', 'MEG0333', 'MEG0643', 'MEG0642', 'MEG0641', 'MEG0541', 'MEG0542', 'MEG0543', 'MEG0311', 'MEG0312', 'MEG0313', 'MEG0511', 'MEG0512', 'MEG0513', 'MEG0521', 'MEG0522', 'MEG0523', 'MEG0531', 'MEG0532', 'MEG0533'],
# 'Right Frontal': ['MEG0811', 'MEG0812', 'MEG0813', 'MEG0911', 'MEG0912', 'MEG0913', 'MEG0921', 'MEG0922', 'MEG0923', 'MEG0931', 'MEG0932', 'MEG0933', 'MEG0941', 'MEG0942', 'MEG0943', 'MEG1011', 'MEG1012', 'MEG1013', 'MEG1021', 'MEG1022', 'MEG1023', 'MEG1031', 'MEG1032', 'MEG1033', 'MEG1211', 'MEG1212', 'MEG1213', 'MEG1221', 'MEG1222', 'MEG1223', 'MEG1231', 'MEG1232', 'MEG1233', 'MEG1241', 'MEG1242', 'MEG1243', 'MEG1411', 'MEG1412', 'MEG1413'],
# 'Left Temporal': ['MEG0111', 'MEG0112', 'MEG0113', 'MEG0131', 'MEG0132', 'MEG0133', 'MEG0141', 'MEG0142', 'MEG0143', 'MEG0211', 'MEG0212', 'MEG0213', 'MEG0221', 'MEG0222', 'MEG0223', 'MEG0231', 'MEG0232', 'MEG0233', 'MEG0241', 'MEG0242', 'MEG0243', 'MEG1511', 'MEG1512', 'MEG1513', 'MEG1521', 'MEG1522', 'MEG1523', 'MEG1531', 'MEG1532', 'MEG1533', 'MEG1541', 'MEG1542', 'MEG1543', 'MEG1611', 'MEG1612', 'MEG1613', 'MEG1621', 'MEG1622', 'MEG1623'],
# 'Right Temporal': ['MEG1311', 'MEG1312', 'MEG1313', 'MEG1321', 'MEG1322', 'MEG1323', 'MEG1421', 'MEG1422', 'MEG1423', 'MEG1431', 'MEG1432', 'MEG1433', 'MEG1441', 'MEG1442', 'MEG1443', 'MEG1341', 'MEG1342', 'MEG1343', 'MEG1331', 'MEG1332', 'MEG1333', 'MEG2611', 'MEG2612', 'MEG2613', 'MEG2621', 'MEG2622', 'MEG2623', 'MEG2631', 'MEG2632', 'MEG2633', 'MEG2641', 'MEG2642', 'MEG2643', 'MEG2411', 'MEG2412', 'MEG2413', 'MEG2421', 'MEG2422', 'MEG2423'],
# 'Left Parietal': ['MEG0411', 'MEG0412', 'MEG0413', 'MEG0421', 'MEG0422', 'MEG0423', 'MEG0431', 'MEG0432', 'MEG0433', 'MEG0441', 'MEG0442', 'MEG0443', 'MEG0711', 'MEG0712', 'MEG0713', 'MEG0741', 'MEG0742', 'MEG0743', 'MEG1811', 'MEG1812', 'MEG1813', 'MEG1821', 'MEG1822', 'MEG1823', 'MEG1831', 'MEG1832', 'MEG1833', 'MEG1841', 'MEG1842', 'MEG1843', 'MEG0631', 'MEG0632', 'MEG0633', 'MEG1631', 'MEG1632', 'MEG1633', 'MEG2011', 'MEG2012', 'MEG2013'],
# 'Right Parietal': ['MEG1041', 'MEG1042', 'MEG1043', 'MEG1111', 'MEG1112', 'MEG1113', 'MEG1121', 'MEG1122', 'MEG1123', 'MEG1131', 'MEG1132', 'MEG1133', 'MEG2233', 'MEG1141', 'MEG1142', 'MEG1143', 'MEG2243', 'MEG0721', 'MEG0722', 'MEG0723', 'MEG0731', 'MEG0732', 'MEG0733', 'MEG2211', 'MEG2212', 'MEG2213', 'MEG2221', 'MEG2222', 'MEG2223', 'MEG2231', 'MEG2232', 'MEG2233', 'MEG2241', 'MEG2242', 'MEG2243', 'MEG2021', 'MEG2022', 'MEG2023', 'MEG2441', 'MEG2442', 'MEG2443'],
# 'Left Occipital': ['MEG1641', 'MEG1642', 'MEG1643', 'MEG1711', 'MEG1712', 'MEG1713', 'MEG1721', 'MEG1722', 'MEG1723', 'MEG1731', 'MEG1732', 'MEG1733', 'MEG1741', 'MEG1742', 'MEG1743', 'MEG1911', 'MEG1912', 'MEG1913', 'MEG1921', 'MEG1922', 'MEG1923', 'MEG1931', 'MEG1932', 'MEG1933', 'MEG1941', 'MEG1942', 'MEG1943', 'MEG2041', 'MEG2042', 'MEG2043', 'MEG2111', 'MEG2112', 'MEG2113', 'MEG2141', 'MEG2142', 'MEG2143', 'MEG2031', 'MEG2032', 'MEG2033', 'MEG2121', 'MEG2122', 'MEG2123', 'MEG2311', 'MEG2312', 'MEG2313', 'MEG2321', 'MEG2322', 'MEG2323', 'MEG2331', 'MEG2332', 'MEG2333', 'MEG2341', 'MEG2342', 'MEG2343', 'MEG2511', 'MEG2512', 'MEG2513', 'MEG2521', 'MEG2522', 'MEG2523', 'MEG2531', 'MEG2532', 'MEG2533', 'MEG2541', 'MEG2542', 'MEG2543', 'MEG2431', 'MEG2432', 'MEG2433', 'MEG2131', 'MEG2132', 'MEG2133'],
# 'Right Occipital': ['MEG0611', 'MEG0612', 'MEG0613']}
# #Now add to lobes_treux also the name of each channel with space in the middle:
for lobe in lobes_treux.keys():
lobes_treux[lobe] += [channel[:-4]+' '+channel[-4:] for channel in lobes_treux[lobe]]
lobe_colors = {
'Left Frontal': '#1f77b4',
'Right Frontal': '#ff7f0e',
'Left Temporal': '#2ca02c',
'Right Temporal': '#9467bd',
'Left Parietal': '#e377c2',
'Right Parietal': '#d62728',
'Left Occipital': '#bcbd22',
'Right Occipital': '#17becf',
'Extra': '#d3d3d3'}
# These were just for Aarons presentation:
# lobe_colors = {
# 'Left Frontal': '#2ca02c',
# 'Right Frontal': '#2ca02c',
# 'Left Temporal': '#2ca02c',
# 'Right Temporal': '#2ca02c',
# 'Left Parietal': '#2ca02c',
# 'Right Parietal': '#2ca02c',
# 'Left Occipital': '#2ca02c',
# 'Right Occipital': '#d62728'}
#loop over all values in the dictionary:
lobes_color_coding_str='Color coding by lobe is applied as per Treux system. Separation by lobes based on Y. Hu et al. "Partial Least Square Aided Beamforming Algorithm in Magnetoencephalography Source Imaging", 2018. '
for key, value in channels_objs.items():
for ch in value:
categorized = False
for lobe in lobes_treux.keys():
if ch.name in lobes_treux[lobe]:
ch.lobe = lobe
ch.lobe_color = lobe_colors[lobe]
categorized = True
break
# If the channel was not categorized, assign it to 'extra' lobe
if not categorized:
ch.lobe = 'Extra'
ch.lobe_color = lobe_colors[lobe]
return channels_objs, lobes_color_coding_str
[docs]def assign_channels_properties(channels_short: dict, meg_system: str):
"""
Assign lobe area to each channel according to the lobe area dictionary + the color for plotting + channel location.
Can later try to make this function a method of the MEG_channels class.
At the moment not possible because it needs to know the total number of channels to figure which meg system to use for locations. And MEG_channels class is created for each channel separately.
Parameters
----------
channels : dict
dict with channels names like: {'mag': [...], 'grad': [...]}
meg_system: str
CTF, Triux, None...
Returns
-------
channels_objs : dict
Dictionary with channel names for each channel type: mag, grad. Each channel has assigned lobe area and color for plotting + channel location.
lobes_color_coding_str : str
A string with information about the color coding of the lobes.
"""
channels_full = copy.deepcopy(channels_short)
# for understanding how the locations are obtained. They can be extracted as:
# mag_locs = raw.copy().pick('mag').info['chs']
# mag_pos = [ch['loc'][:3] for ch in mag_locs]
# (XYZ locations are first 3 digit in the ch['loc'] where ch is 1 sensor in raw.info['chs'])
# Assign lobe labels to the channels:
if meg_system.upper() == 'TRIUX' and len(channels_full['mag']) == 102 and len(channels_full['grad']) == 204:
#for 306 channel data in Elekta/Neuromag Treux system
channels_full, lobes_color_coding_str = add_Triux_lobes(channels_full)
#assign 'TRIUX' to all channels:
for key, value in channels_full.items():
for ch in value:
ch.system = 'TRIUX'
elif meg_system.upper() == 'CTF':
channels_full, lobes_color_coding_str = add_CTF_lobes(channels_full)
#assign 'CTF' to all channels:
for key, value in channels_full.items():
for ch in value:
ch.system = 'CTF'
else:
lobes_color_coding_str='For MEG systems other than MEGIN Triux or CTF color coding by lobe is not applied.'
lobe_colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#9467bd', '#e377c2', '#d62728', '#bcbd22', '#17becf']
print('___MEGqc___: ' + lobes_color_coding_str)
for key, value in channels_full.items():
for ch in value:
ch.lobe = 'All channels'
#take random color from lobe_colors:
ch.lobe_color = random.choice(lobe_colors)
ch.system = 'OTHER'
#sort channels by name:
for key, value in channels_full.items():
channels_full[key] = sorted(value, key=lambda x: x.name)
return channels_full, lobes_color_coding_str
[docs]def sort_channels_by_lobe(channels_objs: dict):
""" Sorts channels by lobes.
Parameters
----------
channels_objs : dict
A dictionary of channel objects.
Returns
-------
chs_by_lobe : dict
A dictionary of channels sorted by ch type and lobe.
"""
chs_by_lobe = {}
for m_or_g in channels_objs:
#put all channels into separate lists based on their lobes:
lobes_names=list(set([ch.lobe for ch in channels_objs[m_or_g]]))
lobes_dict = {key: [] for key in lobes_names}
#fill the dict with channels:
for ch in channels_objs[m_or_g]:
lobes_dict[ch.lobe].append(ch)
# Sort the dictionary by lobes names (by the second word in the key, if it exists)
chs_by_lobe[m_or_g] = dict(sorted(lobes_dict.items(), key=lambda x: x[0].split()[1] if len(x[0].split()) > 1 else ''))
return chs_by_lobe
[docs]def initial_processing(default_settings: dict, filtering_settings: dict, epoching_params:dict, file_path: str):
"""
Here all the initial actions needed to analyse MEG data are done:
- read fif file,
- separate mags and grads names into 2 lists,
- crop the data if needed,
- filter and downsample the data,
- epoch the data.
Parameters
----------
default_settings : dict
Dictionary with default settings for MEG QC.
filtering_settings : dict
Dictionary with parameters for filtering.
epoching_params : dict
Dictionary with parameters for epoching.
file_path : str
Path to the fif file with MEG data.
Returns
-------
dict_epochs_mg : dict
Dictionary with epochs for each channel type: mag, grad.
chs_by_lobe : dict
Dictionary with channel objects for each channel type: mag, grad. And by lobe. Each obj hold info about the channel name,
lobe area and color code, locations and (in the future) pther info, like: if it has noise of any sort.
channels : dict
Dictionary with channel names for each channel type: mag, grad.
raw_crop_filtered : mne.io.Raw
Filtered and cropped MEG data.
raw_crop_filtered_resampled : mne.io.Raw
Filtered, cropped and resampled MEG data.
raw_cropped : mne.io.Raw
Cropped MEG data.
raw : mne.io.Raw
MEG data.
info_derivs : list
List with QC_derivative objects with MNE info object.
shielding_str : str
String with information about active shielding.
epoching_str : str
String with information about epoching.
sensors_derivs : list
List with data frames with sensors info.
m_or_g_chosen : list
List with channel types to analize: mag, grad.
m_or_g_skipped_str : str
String with information about which channel types were skipped.
lobes_color_coding_str : str
String with information about color coding for lobes.
resample_str : str
String with information about resampling.
"""
print('___MEGqc___: ', 'Reading data from file:', file_path)
raw, shielding_str, meg_system = load_data(file_path)
# Working with channels:
channels = choose_channels(raw)
if meg_system == 'CTF': #ONLY FOR CTF we do this! Return raw with changed channel types.
channels, raw = change_ch_type_CTF(raw, channels)
#Turn channel names into objects:
channels_objs = {key: [MEG_channel(name=ch_name, type=key) for ch_name in value] for key, value in channels.items()}
#Assign channels properties:
channels_objs, lobes_color_coding_str = assign_channels_properties(channels_objs, meg_system)
#Add channel locations:
channels_objs = add_3d_ch_locations(raw, channels_objs)
#Check if there are channels to analyze according to info in config file:
m_or_g_chosen, m_or_g_skipped_str = check_chosen_ch_types(m_or_g_chosen=default_settings['m_or_g_chosen'], channels_objs=channels_objs)
#Sort channels by lobe - this will be used often for plotting
chs_by_lobe = sort_channels_by_lobe(channels_objs)
print('___MEGqc___: ', 'Channels sorted by lobe.')
info = raw.info
info_derivs = [QC_derivative(content = info, name = 'RawInfo', content_type = 'info', fig_order=-1)]
#crop the data to calculate faster:
tmax_possible = raw.times[-1]
tmax=default_settings['crop_tmax']
if tmax is None or tmax > tmax_possible:
tmax = tmax_possible
raw_cropped = raw.copy().crop(tmin=default_settings['crop_tmin'], tmax=tmax)
#When resampling for plotting, cropping or anything else you don't need permanent in raw inside any functions - always do raw_new=raw.copy() not just raw_new=raw. The last command doesn't create a new object, the whole raw will be changed and this will also be passed to other functions even if you don't return the raw.
stim_deriv = stim_data_to_df(raw_cropped)
#Data filtering:
raw_cropped_filtered = raw_cropped.copy()
if filtering_settings['apply_filtering'] is True:
raw_cropped.load_data() #Data has to be loaded into mememory before filetering:
raw_cropped_filtered = raw_cropped.copy()
#if filtering_settings['h_freq'] is higher than the Nyquist frequency, set it to Nyquist frequency:
if filtering_settings['h_freq'] > raw_cropped_filtered.info['sfreq']/2 - 1:
filtering_settings['h_freq'] = raw_cropped_filtered.info['sfreq']/2 - 1
print('___MEGqc___: ', 'High frequency for filtering is higher than Nyquist frequency. High frequency was set to Nyquist frequency:', filtering_settings['h_freq'])
raw_cropped_filtered.filter(l_freq=filtering_settings['l_freq'], h_freq=filtering_settings['h_freq'], picks='meg', method=filtering_settings['method'], iir_params=None)
print('___MEGqc___: ', 'Data filtered from', filtering_settings['l_freq'], 'to', filtering_settings['h_freq'], 'Hz.')
if filtering_settings['downsample_to_hz'] is False:
raw_cropped_filtered_resampled = raw_cropped_filtered.copy()
resample_str = 'Data not resampled. '
print('___MEGqc___: ', resample_str)
elif filtering_settings['downsample_to_hz'] >= filtering_settings['h_freq']*5:
raw_cropped_filtered_resampled = raw_cropped_filtered.copy().resample(sfreq=filtering_settings['downsample_to_hz'])
resample_str = 'Data resampled to ' + str(filtering_settings['downsample_to_hz']) + ' Hz. '
print('___MEGqc___: ', resample_str)
else:
raw_cropped_filtered_resampled = raw_cropped_filtered.copy().resample(sfreq=filtering_settings['h_freq']*5)
#frequency to resample is 5 times higher than the maximum chosen frequency of the function
resample_str = 'Chosen "downsample_to_hz" value set was too low, it must be at least 5 time higher than the highest filer frequency. Data resampled to ' + str(filtering_settings['h_freq']*5) + ' Hz. '
print('___MEGqc___: ', resample_str)
else:
print('___MEGqc___: ', 'Data not filtered.')
#And downsample:
if filtering_settings['downsample_to_hz'] is not False:
raw_cropped_filtered_resampled = raw_cropped_filtered.copy().resample(sfreq=filtering_settings['downsample_to_hz'])
if filtering_settings['downsample_to_hz'] < 500:
resample_str = 'Data resampled to ' + str(filtering_settings['downsample_to_hz']) + ' Hz. Keep in mind: resampling to less than 500Hz is not recommended, since it might result in high frequency data loss (for example of the CHPI coils signal. '
print('___MEGqc___: ', resample_str)
else:
resample_str = 'Data resampled to ' + str(filtering_settings['downsample_to_hz']) + ' Hz. '
print('___MEGqc___: ', resample_str)
else:
raw_cropped_filtered_resampled = raw_cropped_filtered.copy()
resample_str = 'Data not resampled. '
print('___MEGqc___: ', resample_str)
#Apply epoching: USE NON RESAMPLED DATA. Or should we resample after epoching?
# Since sampling freq is 1kHz and resampling is 500Hz, it s not that much of a win...
dict_epochs_mg = Epoch_meg(epoching_params, data=raw_cropped_filtered)
epoching_str = ''
if dict_epochs_mg['mag'] is None and dict_epochs_mg['grad'] is None:
epoching_str = ''' <p>No epoching could be done in this data set: no events found. Quality measurement were only performed on the entire time series. If this was not expected, try: 1) checking the presence of stimulus channel in the data set, 2) setting stimulus channel explicitly in config file, 3) setting different event duration in config file.</p><br></br>'''
resample_str = '<p>' + resample_str + '</p>'
#Extract chs_by_lobe into a data frame
sensors_derivs = chs_dict_to_csv(chs_by_lobe, file_name_prefix = 'Sensors')
return meg_system, dict_epochs_mg, chs_by_lobe, channels, raw_cropped_filtered, raw_cropped_filtered_resampled, raw_cropped, raw, info_derivs, stim_deriv, shielding_str, epoching_str, sensors_derivs, m_or_g_chosen, m_or_g_skipped_str, lobes_color_coding_str, resample_str
[docs]def chs_dict_to_csv(chs_by_lobe: dict, file_name_prefix: str):
"""
Convert dictionary with channels objects to a data frame and save it as a csv file.
Parameters
----------
chs_by_lobe : dict
Dictionary with channel objects for each channel type: mag, grad. And by lobe. Each obj hold info about the channel name,
lobe area and color code, locations and (in the future) pther info, like: if it has noise of any sort.
file_name_prefix : str
Prefix for the file name. Example: 'Sensors' will result in file name 'Sensors.csv'.
Returns
-------
df_deriv : list
List with data frames with sensors info.
"""
#Extract chs_by_lobe into a data frame
chs_by_lobe_df = {k1: {k2: pd.concat([channel.to_df() for channel in v2]) for k2, v2 in v1.items()} for k1, v1 in chs_by_lobe.items()}
its = []
for ch_type, content in chs_by_lobe_df.items():
for lobe, items in content.items():
its.append(items)
df_fin = pd.concat(its)
# if df already contains columns like 'STD epoch_' with numbers, 'STD epoch' needs to be removed from the data frame:
if 'STD epoch' in df_fin and any(col.startswith('STD epoch_') and col[10:].isdigit() for col in df_fin.columns):
# If there are, drop the 'STD epoch' column
df_fin = df_fin.drop(columns='STD epoch')
if 'PtP epoch' in df_fin and any(col.startswith('PtP epoch_') and col[10:].isdigit() for col in df_fin.columns):
# If there are, drop the 'PtP epoch' column
df_fin = df_fin.drop(columns='PtP epoch')
if 'PSD' in df_fin and any(col.startswith('PSD_') and col[4:].isdigit() for col in df_fin.columns):
# If there are, drop the 'STD epoch' column
df_fin = df_fin.drop(columns='PSD')
if 'ECG' in df_fin and any(col.startswith('ECG_') and col[4:].isdigit() for col in df_fin.columns):
# If there are, drop the 'STD epoch' column
df_fin = df_fin.drop(columns='ECG')
if 'EOG' in df_fin and any(col.startswith('EOG_') and col[4:].isdigit() for col in df_fin.columns):
# If there are, drop the 'STD epoch' column
df_fin = df_fin.drop(columns='EOG')
df_deriv = [QC_derivative(content = df_fin, name = file_name_prefix, content_type = 'df')]
return df_deriv