Module pmt_analysis.analysis.scaler

Expand source code
import numpy as np
import pandas as pd
import warnings
from typing import Union, Optional


class Scaler:
    """Class for the analysis of CAEN V260 scaler data loaded with `pmt_analysis.utils.input.ScalerRawData`.

    Attributes:
        data: Pandas data frame with scaler data as returned by
            `pmt_analysis.utils.input.ScalerRawData.get_data` method.
        t_int: Data acquisition interval in seconds. Attribute of
            `pmt_analysis.utils.input.ScalerRawData` object.
        partition_t: List of UNIX timestamps partitioning the data or integer number of seconds indicating the temporal
            width of the consecutive partitions to split the data into.
    """

    def __init__(self, data: pd.DataFrame,  t_int: int,
                 partition_t: Optional[Union[np.ndarray, pd.Series, list, int]] = None):
        """Init of the Scaler class.

        Args:
            data: Pandas data frame with scaler data as returned by
                `pmt_analysis.utils.input.ScalerRawData.get_data` method.
            t_int: Data acquisition interval in seconds. Attribute of
                `pmt_analysis.utils.input.ScalerRawData` object.
            partition_t: Array of UNIX timestamps partitioning the data or integer number of seconds indicating the
                temporal width of the consecutive partitions to split the data into. Indicate first timestamp as NaN
                to automatically infer start time of the data taking. If set to None, handle full data set as one
                partition.
        """
        self.data = data
        self.t_int = t_int
        if type(partition_t) in [list, np.ndarray, pd.Series]:
            self.partition_t = np.array(partition_t)
            # Replace first entry if NaN in case of undefined start time
            if np.isnan(self.partition_t[0]):
                self.partition_t[0] = min(self.data['timestamp'])
        elif (type(partition_t) in [int]) or (partition_t is None):
            self.partition_t = partition_t
            self.get_partitions()
        else:
            raise TypeError('Parameter `partition_t` must be of type list or int.')
        self.partition_t = np.sort(self.partition_t)

    def get_partitions(self):
        """Get partition start timestamps for partitions of width `partition_t` seconds."""
        t_min = min(self.data['timestamp'])
        t_max = max(self.data['timestamp'])
        t_diff = t_max-t_min
        if self.partition_t is None:
            # Take full data set as one partition.
            self.partition_t = np.array([t_min])
        elif self.partition_t > t_diff:
            warnings.warn('Temporal extent of `data` ({}) '
                          'smaller than `partition_t` ({}).'.format(t_diff, self.partition_t))
            # Take full data set as one partition.
            self.partition_t = np.array([t_min])
        else:
            # Find partition start timestamps. The last partition may be larger than the provided `partition_t`.
            self.partition_t = np.arange(t_diff//self.partition_t)*self.partition_t + t_min

    def get_values(self, channel: int, give_rate: bool = False, verbose: bool = True,
                   margin_start: float = 0, margin_end: float = 0) -> dict:
        """Get characteristic values (median / most probable count / count rate value, standard deviations,...)
        in partitions.

        Args:
            channel: Scaler channel number.
            give_rate: If true, use count rates in Hz, otherwise use absolute count values.
            verbose: Verbosity of the output.
            margin_start: Margin in seconds of data to exclude after start of partition.
            margin_end: Margin in seconds of data to exclude before end of partition.

        Returns:
            values_dict: Dictionary with following keys:
                `t_start` UNIX timestamp start of partition,
                `t_end` UNIX timestamp end of partition,
                `bins_centers` bin centers histogram for mode determination,
                `cnts` counts histogram for mode determination,
                `w_window` rolling average window width for mode determination,
                `cnts_smoothed` rolling average histogram for mode determination,
                `mode` mode count / count rate value from smoothed histogram,
                `median` median count / count rate value,
                `mean` mean count / count rate value,
                `perc_25` first quartile count / count rate value,
                `perc_75` third quartile count / count rate value,
                `std` standard deviation count / count rate value,
                `std_mean` standard error of the mean count / count rate value
        """
        values_dict = {key: [] for key in ['t_start', 't_end', 'bins_centers', 'cnts', 'w_window', 'cnts_smoothed',
                                           'mode', 'median', 'mean', 'perc_25', 'perc_75', 'std', 'std_mean']}
        t_last_partition = max(self.partition_t)
        if give_rate:
            value_name = 'ch{}_freq'.format(channel)
        else:
            value_name = 'ch{}_cnts'.format(channel)
        if margin_start < 0:
            margin_start = np.abs(margin_start)
            warnings.warn('Converted margin_start to positive value.')
        if margin_end < 0:
            margin_end = np.abs(margin_end)
            warnings.warn('Converted margin_end to positive value.')
        if (margin_end > 300) or (margin_start > 300):
            warnings.warn('Values of more than 300s for margin_start and margin_end are discouraged.')
        for i, t_start_partition in enumerate(self.partition_t):
            values_dict['t_start'].append(t_start_partition+margin_start)
            if t_start_partition == t_last_partition:
                t_start_next_partition = max(self.data['timestamp'])
                if margin_start+margin_end >= t_start_next_partition-t_start_partition:
                    raise ValueError('Margins are larger than partition.')
                data_sel = self.data[value_name][(self.data['timestamp'] >= t_start_partition+margin_start) &
                                                 (self.data['timestamp'] <= t_start_next_partition-margin_end)]
            else:
                t_start_next_partition = self.partition_t[i+1]
                if margin_start+margin_end >= t_start_next_partition-t_start_partition:
                    raise ValueError('Margins are larger than partition.')
                data_sel = self.data[value_name][(self.data['timestamp'] >= t_start_partition+margin_start) &
                                                 (self.data['timestamp'] < t_start_next_partition-margin_end)]
            values_dict['t_end'].append(t_start_next_partition-margin_end)

            # Get modes from smoothed data
            p_05 = np.floor(np.percentile(data_sel, 5))
            p_25 = np.percentile(data_sel, 25)
            p_75 = np.percentile(data_sel, 75)
            p_95 = np.ceil(np.percentile(data_sel, 95))
            cnts, bins_edges = np.histogram(data_sel, bins=np.arange(p_05, p_95+1/self.t_int, 1/self.t_int))
            bins_centers = (bins_edges[1:] + bins_edges[:-1]) / 2
            values_dict['bins_centers'].append(bins_centers)
            values_dict['cnts'].append(cnts)
            # Smoothing, get window size based on ideal bin number from Sturges’ Rule
            n_bins_ideal = np.ceil(np.log2(data_sel[(data_sel >= p_25) &
                                                    (data_sel <= p_75)].shape[0]) + 1)
            w_bins_ideal = (p_75 - p_25)/n_bins_ideal
            w_window = max(3, int(5*w_bins_ideal))  # smoothing window size ~ 5 ideal bin widths
            values_dict['w_window'].append(w_window)
            cnts_smoothed = pd.Series(cnts).rolling(window=w_window, center=True, min_periods=1).mean()
            values_dict['cnts_smoothed'].append(np.array(cnts_smoothed))
            mode_naive = np.mean(bins_centers[cnts_smoothed == np.max(cnts_smoothed)])
            values_dict['mode'].append(mode_naive)

            # Other characteristic values
            median = np.median(data_sel)
            values_dict['median'].append(median)
            values_dict['mean'].append(np.mean(data_sel))
            std = np.std(data_sel)
            values_dict['std'].append(std)
            values_dict['std_mean'].append(std/np.sqrt(data_sel.shape[0]))
            values_dict['perc_25'].append(p_25)
            values_dict['perc_75'].append(p_75)

            if np.unique([len(values_dict[el]) for el in values_dict.keys()]).shape[0] != 1:
                warnings.warn('Entries in values_dict have different lengths.')
            if verbose:
                print('time: {:.0f}-{:.0f}, smoothing window: {}, mode: {}, '
                      'median: {}'.format(int(t_start_partition), int(t_start_next_partition), w_window,
                                          mode_naive, median))

        return values_dict

Classes

class Scaler (data: pandas.core.frame.DataFrame, t_int: int, partition_t: Union[numpy.ndarray, pandas.core.series.Series, list, int, None] = None)

Class for the analysis of CAEN V260 scaler data loaded with ScalerRawData.

Attributes

data
Pandas data frame with scaler data as returned by ScalerRawData.get_data() method.
t_int
Data acquisition interval in seconds. Attribute of ScalerRawData object.
partition_t
List of UNIX timestamps partitioning the data or integer number of seconds indicating the temporal width of the consecutive partitions to split the data into.

Init of the Scaler class.

Args

data
Pandas data frame with scaler data as returned by ScalerRawData.get_data() method.
t_int
Data acquisition interval in seconds. Attribute of ScalerRawData object.
partition_t
Array of UNIX timestamps partitioning the data or integer number of seconds indicating the temporal width of the consecutive partitions to split the data into. Indicate first timestamp as NaN to automatically infer start time of the data taking. If set to None, handle full data set as one partition.
Expand source code
class Scaler:
    """Class for the analysis of CAEN V260 scaler data loaded with `pmt_analysis.utils.input.ScalerRawData`.

    Attributes:
        data: Pandas data frame with scaler data as returned by
            `pmt_analysis.utils.input.ScalerRawData.get_data` method.
        t_int: Data acquisition interval in seconds. Attribute of
            `pmt_analysis.utils.input.ScalerRawData` object.
        partition_t: List of UNIX timestamps partitioning the data or integer number of seconds indicating the temporal
            width of the consecutive partitions to split the data into.
    """

    def __init__(self, data: pd.DataFrame,  t_int: int,
                 partition_t: Optional[Union[np.ndarray, pd.Series, list, int]] = None):
        """Init of the Scaler class.

        Args:
            data: Pandas data frame with scaler data as returned by
                `pmt_analysis.utils.input.ScalerRawData.get_data` method.
            t_int: Data acquisition interval in seconds. Attribute of
                `pmt_analysis.utils.input.ScalerRawData` object.
            partition_t: Array of UNIX timestamps partitioning the data or integer number of seconds indicating the
                temporal width of the consecutive partitions to split the data into. Indicate first timestamp as NaN
                to automatically infer start time of the data taking. If set to None, handle full data set as one
                partition.
        """
        self.data = data
        self.t_int = t_int
        if type(partition_t) in [list, np.ndarray, pd.Series]:
            self.partition_t = np.array(partition_t)
            # Replace first entry if NaN in case of undefined start time
            if np.isnan(self.partition_t[0]):
                self.partition_t[0] = min(self.data['timestamp'])
        elif (type(partition_t) in [int]) or (partition_t is None):
            self.partition_t = partition_t
            self.get_partitions()
        else:
            raise TypeError('Parameter `partition_t` must be of type list or int.')
        self.partition_t = np.sort(self.partition_t)

    def get_partitions(self):
        """Get partition start timestamps for partitions of width `partition_t` seconds."""
        t_min = min(self.data['timestamp'])
        t_max = max(self.data['timestamp'])
        t_diff = t_max-t_min
        if self.partition_t is None:
            # Take full data set as one partition.
            self.partition_t = np.array([t_min])
        elif self.partition_t > t_diff:
            warnings.warn('Temporal extent of `data` ({}) '
                          'smaller than `partition_t` ({}).'.format(t_diff, self.partition_t))
            # Take full data set as one partition.
            self.partition_t = np.array([t_min])
        else:
            # Find partition start timestamps. The last partition may be larger than the provided `partition_t`.
            self.partition_t = np.arange(t_diff//self.partition_t)*self.partition_t + t_min

    def get_values(self, channel: int, give_rate: bool = False, verbose: bool = True,
                   margin_start: float = 0, margin_end: float = 0) -> dict:
        """Get characteristic values (median / most probable count / count rate value, standard deviations,...)
        in partitions.

        Args:
            channel: Scaler channel number.
            give_rate: If true, use count rates in Hz, otherwise use absolute count values.
            verbose: Verbosity of the output.
            margin_start: Margin in seconds of data to exclude after start of partition.
            margin_end: Margin in seconds of data to exclude before end of partition.

        Returns:
            values_dict: Dictionary with following keys:
                `t_start` UNIX timestamp start of partition,
                `t_end` UNIX timestamp end of partition,
                `bins_centers` bin centers histogram for mode determination,
                `cnts` counts histogram for mode determination,
                `w_window` rolling average window width for mode determination,
                `cnts_smoothed` rolling average histogram for mode determination,
                `mode` mode count / count rate value from smoothed histogram,
                `median` median count / count rate value,
                `mean` mean count / count rate value,
                `perc_25` first quartile count / count rate value,
                `perc_75` third quartile count / count rate value,
                `std` standard deviation count / count rate value,
                `std_mean` standard error of the mean count / count rate value
        """
        values_dict = {key: [] for key in ['t_start', 't_end', 'bins_centers', 'cnts', 'w_window', 'cnts_smoothed',
                                           'mode', 'median', 'mean', 'perc_25', 'perc_75', 'std', 'std_mean']}
        t_last_partition = max(self.partition_t)
        if give_rate:
            value_name = 'ch{}_freq'.format(channel)
        else:
            value_name = 'ch{}_cnts'.format(channel)
        if margin_start < 0:
            margin_start = np.abs(margin_start)
            warnings.warn('Converted margin_start to positive value.')
        if margin_end < 0:
            margin_end = np.abs(margin_end)
            warnings.warn('Converted margin_end to positive value.')
        if (margin_end > 300) or (margin_start > 300):
            warnings.warn('Values of more than 300s for margin_start and margin_end are discouraged.')
        for i, t_start_partition in enumerate(self.partition_t):
            values_dict['t_start'].append(t_start_partition+margin_start)
            if t_start_partition == t_last_partition:
                t_start_next_partition = max(self.data['timestamp'])
                if margin_start+margin_end >= t_start_next_partition-t_start_partition:
                    raise ValueError('Margins are larger than partition.')
                data_sel = self.data[value_name][(self.data['timestamp'] >= t_start_partition+margin_start) &
                                                 (self.data['timestamp'] <= t_start_next_partition-margin_end)]
            else:
                t_start_next_partition = self.partition_t[i+1]
                if margin_start+margin_end >= t_start_next_partition-t_start_partition:
                    raise ValueError('Margins are larger than partition.')
                data_sel = self.data[value_name][(self.data['timestamp'] >= t_start_partition+margin_start) &
                                                 (self.data['timestamp'] < t_start_next_partition-margin_end)]
            values_dict['t_end'].append(t_start_next_partition-margin_end)

            # Get modes from smoothed data
            p_05 = np.floor(np.percentile(data_sel, 5))
            p_25 = np.percentile(data_sel, 25)
            p_75 = np.percentile(data_sel, 75)
            p_95 = np.ceil(np.percentile(data_sel, 95))
            cnts, bins_edges = np.histogram(data_sel, bins=np.arange(p_05, p_95+1/self.t_int, 1/self.t_int))
            bins_centers = (bins_edges[1:] + bins_edges[:-1]) / 2
            values_dict['bins_centers'].append(bins_centers)
            values_dict['cnts'].append(cnts)
            # Smoothing, get window size based on ideal bin number from Sturges’ Rule
            n_bins_ideal = np.ceil(np.log2(data_sel[(data_sel >= p_25) &
                                                    (data_sel <= p_75)].shape[0]) + 1)
            w_bins_ideal = (p_75 - p_25)/n_bins_ideal
            w_window = max(3, int(5*w_bins_ideal))  # smoothing window size ~ 5 ideal bin widths
            values_dict['w_window'].append(w_window)
            cnts_smoothed = pd.Series(cnts).rolling(window=w_window, center=True, min_periods=1).mean()
            values_dict['cnts_smoothed'].append(np.array(cnts_smoothed))
            mode_naive = np.mean(bins_centers[cnts_smoothed == np.max(cnts_smoothed)])
            values_dict['mode'].append(mode_naive)

            # Other characteristic values
            median = np.median(data_sel)
            values_dict['median'].append(median)
            values_dict['mean'].append(np.mean(data_sel))
            std = np.std(data_sel)
            values_dict['std'].append(std)
            values_dict['std_mean'].append(std/np.sqrt(data_sel.shape[0]))
            values_dict['perc_25'].append(p_25)
            values_dict['perc_75'].append(p_75)

            if np.unique([len(values_dict[el]) for el in values_dict.keys()]).shape[0] != 1:
                warnings.warn('Entries in values_dict have different lengths.')
            if verbose:
                print('time: {:.0f}-{:.0f}, smoothing window: {}, mode: {}, '
                      'median: {}'.format(int(t_start_partition), int(t_start_next_partition), w_window,
                                          mode_naive, median))

        return values_dict

Methods

def get_partitions(self)

Get partition start timestamps for partitions of width partition_t seconds.

Expand source code
def get_partitions(self):
    """Get partition start timestamps for partitions of width `partition_t` seconds."""
    t_min = min(self.data['timestamp'])
    t_max = max(self.data['timestamp'])
    t_diff = t_max-t_min
    if self.partition_t is None:
        # Take full data set as one partition.
        self.partition_t = np.array([t_min])
    elif self.partition_t > t_diff:
        warnings.warn('Temporal extent of `data` ({}) '
                      'smaller than `partition_t` ({}).'.format(t_diff, self.partition_t))
        # Take full data set as one partition.
        self.partition_t = np.array([t_min])
    else:
        # Find partition start timestamps. The last partition may be larger than the provided `partition_t`.
        self.partition_t = np.arange(t_diff//self.partition_t)*self.partition_t + t_min
def get_values(self, channel: int, give_rate: bool = False, verbose: bool = True, margin_start: float = 0, margin_end: float = 0) ‑> dict

Get characteristic values (median / most probable count / count rate value, standard deviations,…) in partitions.

Args

channel
Scaler channel number.
give_rate
If true, use count rates in Hz, otherwise use absolute count values.
verbose
Verbosity of the output.
margin_start
Margin in seconds of data to exclude after start of partition.
margin_end
Margin in seconds of data to exclude before end of partition.

Returns

values_dict
Dictionary with following keys: t_start UNIX timestamp start of partition, t_end UNIX timestamp end of partition, bins_centers bin centers histogram for mode determination, cnts counts histogram for mode determination, w_window rolling average window width for mode determination, cnts_smoothed rolling average histogram for mode determination, mode mode count / count rate value from smoothed histogram, median median count / count rate value, mean mean count / count rate value, perc_25 first quartile count / count rate value, perc_75 third quartile count / count rate value, std standard deviation count / count rate value, std_mean standard error of the mean count / count rate value
Expand source code
def get_values(self, channel: int, give_rate: bool = False, verbose: bool = True,
               margin_start: float = 0, margin_end: float = 0) -> dict:
    """Get characteristic values (median / most probable count / count rate value, standard deviations,...)
    in partitions.

    Args:
        channel: Scaler channel number.
        give_rate: If true, use count rates in Hz, otherwise use absolute count values.
        verbose: Verbosity of the output.
        margin_start: Margin in seconds of data to exclude after start of partition.
        margin_end: Margin in seconds of data to exclude before end of partition.

    Returns:
        values_dict: Dictionary with following keys:
            `t_start` UNIX timestamp start of partition,
            `t_end` UNIX timestamp end of partition,
            `bins_centers` bin centers histogram for mode determination,
            `cnts` counts histogram for mode determination,
            `w_window` rolling average window width for mode determination,
            `cnts_smoothed` rolling average histogram for mode determination,
            `mode` mode count / count rate value from smoothed histogram,
            `median` median count / count rate value,
            `mean` mean count / count rate value,
            `perc_25` first quartile count / count rate value,
            `perc_75` third quartile count / count rate value,
            `std` standard deviation count / count rate value,
            `std_mean` standard error of the mean count / count rate value
    """
    values_dict = {key: [] for key in ['t_start', 't_end', 'bins_centers', 'cnts', 'w_window', 'cnts_smoothed',
                                       'mode', 'median', 'mean', 'perc_25', 'perc_75', 'std', 'std_mean']}
    t_last_partition = max(self.partition_t)
    if give_rate:
        value_name = 'ch{}_freq'.format(channel)
    else:
        value_name = 'ch{}_cnts'.format(channel)
    if margin_start < 0:
        margin_start = np.abs(margin_start)
        warnings.warn('Converted margin_start to positive value.')
    if margin_end < 0:
        margin_end = np.abs(margin_end)
        warnings.warn('Converted margin_end to positive value.')
    if (margin_end > 300) or (margin_start > 300):
        warnings.warn('Values of more than 300s for margin_start and margin_end are discouraged.')
    for i, t_start_partition in enumerate(self.partition_t):
        values_dict['t_start'].append(t_start_partition+margin_start)
        if t_start_partition == t_last_partition:
            t_start_next_partition = max(self.data['timestamp'])
            if margin_start+margin_end >= t_start_next_partition-t_start_partition:
                raise ValueError('Margins are larger than partition.')
            data_sel = self.data[value_name][(self.data['timestamp'] >= t_start_partition+margin_start) &
                                             (self.data['timestamp'] <= t_start_next_partition-margin_end)]
        else:
            t_start_next_partition = self.partition_t[i+1]
            if margin_start+margin_end >= t_start_next_partition-t_start_partition:
                raise ValueError('Margins are larger than partition.')
            data_sel = self.data[value_name][(self.data['timestamp'] >= t_start_partition+margin_start) &
                                             (self.data['timestamp'] < t_start_next_partition-margin_end)]
        values_dict['t_end'].append(t_start_next_partition-margin_end)

        # Get modes from smoothed data
        p_05 = np.floor(np.percentile(data_sel, 5))
        p_25 = np.percentile(data_sel, 25)
        p_75 = np.percentile(data_sel, 75)
        p_95 = np.ceil(np.percentile(data_sel, 95))
        cnts, bins_edges = np.histogram(data_sel, bins=np.arange(p_05, p_95+1/self.t_int, 1/self.t_int))
        bins_centers = (bins_edges[1:] + bins_edges[:-1]) / 2
        values_dict['bins_centers'].append(bins_centers)
        values_dict['cnts'].append(cnts)
        # Smoothing, get window size based on ideal bin number from Sturges’ Rule
        n_bins_ideal = np.ceil(np.log2(data_sel[(data_sel >= p_25) &
                                                (data_sel <= p_75)].shape[0]) + 1)
        w_bins_ideal = (p_75 - p_25)/n_bins_ideal
        w_window = max(3, int(5*w_bins_ideal))  # smoothing window size ~ 5 ideal bin widths
        values_dict['w_window'].append(w_window)
        cnts_smoothed = pd.Series(cnts).rolling(window=w_window, center=True, min_periods=1).mean()
        values_dict['cnts_smoothed'].append(np.array(cnts_smoothed))
        mode_naive = np.mean(bins_centers[cnts_smoothed == np.max(cnts_smoothed)])
        values_dict['mode'].append(mode_naive)

        # Other characteristic values
        median = np.median(data_sel)
        values_dict['median'].append(median)
        values_dict['mean'].append(np.mean(data_sel))
        std = np.std(data_sel)
        values_dict['std'].append(std)
        values_dict['std_mean'].append(std/np.sqrt(data_sel.shape[0]))
        values_dict['perc_25'].append(p_25)
        values_dict['perc_75'].append(p_75)

        if np.unique([len(values_dict[el]) for el in values_dict.keys()]).shape[0] != 1:
            warnings.warn('Entries in values_dict have different lengths.')
        if verbose:
            print('time: {:.0f}-{:.0f}, smoothing window: {}, mode: {}, '
                  'median: {}'.format(int(t_start_partition), int(t_start_next_partition), w_window,
                                      mode_naive, median))

    return values_dict