Source code for bt4vt.core

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Created on 01-05-2022
# @author: wiebket, AnnaLesch

import pandas as pd
import numpy as np
import os
import sys
from datetime import datetime
from pathlib import Path
from .dataio import load_config, load_data, write_data
from .evaluate import evaluate_scores
from .groups import split_scores_by_speaker_groups
from .metrics import compute_metrics_ratios
from .dataset_evaluate import evaluate_scores_by_speaker_groups


[docs]class BiasTest: """ Elementary Class for implementing Bias tests. """ def __init__(self): """ Constructor method """ return self
[docs] def run_tests(self): """ Runs bias tests. This is an empty method that needs to be implemented by subclasses """ return
[docs] def plot(self): """ This is an empty method that can be implemented by subclasses. """ return
[docs] def evaluate_dataset(self): """ This is an empty method that can be implemented by subclasses. """ return
[docs]class SpeakerBiasTest(BiasTest): """ The primary purpose of the SpeakerBiasTest class is the implementation of the run_tests() method, which performs the bias tests. :param scores: Either path to csv or txt file or a Pandas DataFrame that includes information on the reference and test utterances as well as corresponding labels and scores; labels have to be either {-1,1} or {0,1} :type scores: str or DataFrame :param config_file: path to yaml config file :type config_file: str """ def __init__(self, scores, config_file): """Constructor method """ self.error_rates_by_speaker_group = dict() self.metrics = pd.DataFrame() self.config = load_config(config_file) try: self.config["id_delimiter"] except KeyError: self.id_delimiter = "/" else: self.id_delimiter = self.config["id_delimiter"] scores_input = load_data(scores) speaker_metadata_input = load_data(self.config['speaker_metadata_file']) self._check_input(scores_input, speaker_metadata_input) # scores_input columns selection, reordering and renaming scores_input = scores_input[[self.config["label_column"], self.config["reference_filepath_column"], self.config["test_filepath_column"], self.config["scores_column"]]] self.scores = scores_input.rename(columns={self.config["label_column"]: "label", self.config["reference_filepath_column"]: "ref", self.config["test_filepath_column"]: "test", self.config["scores_column"]: "score"}) self.scores = self.scores.astype({"ref": "str", "test": "str"}) # speaker_metadata_input column selection, reordering, renaming id metadata_selection_list = self.config["select_columns"] metadata_selection_list.insert(0, self.config["id_column"]) speaker_metadata_input = speaker_metadata_input[metadata_selection_list] # delete metadata rows that include NaN, None or Empty Strings in selected columns speaker_metadata_input.replace(' ', np.nan, inplace=True) if speaker_metadata_input.isnull().values.any(): print("Selected Columns in Metadata File contain NaNs or Empty Cells. We recommend a dataset evaluation.") speaker_metadata_input.dropna(inplace=True) self.speaker_metadata = speaker_metadata_input.rename(columns={self.config["id_column"]: "id"}) self.speaker_metadata = self.speaker_metadata.astype({"id": "str"}) config_file_name = Path(config_file).stem if isinstance(scores, str): scores_file_name = Path(scores).stem elif isinstance(scores, pd.DataFrame): date = datetime.now() scores_file_name = date.strftime("%d_%m_%Y_%H_%M_%S") else: # TODO: error handling scores_file_name = None # check if results directory exists results_dir = self.config["results_dir"] if not os.path.isdir(os.path.expanduser(results_dir)): os.makedirs(os.path.expanduser(results_dir)) # dataset_evaluation will later be turned into a results file rather than a log file if self.config["dataset_evaluation"]: self._dataset_eval_log_file = "dataset_eval_" + config_file_name + "_" + scores_file_name + ".log" else: self._dataset_eval_log_file = None self._biastest_results_file = "biastest_results_" + config_file_name + "_" + scores_file_name + ".csv" def _check_input(self, scores_input, speaker_metadata_input): """ Check that requirements for performing evaluation are fulfilled e.g. parameters of scores, speaker metadata and config are specified correctly :param scores_input: DataFrame that contains reference and test utterances and corresponding labels and scores :type scores_input: DataFrame :param speaker_metadata_input: DataFrame that contains speaker metadata with speaker ids and speaker groups attributes as specified in config file :type speaker_metadata_input: DataFrame """ # check config file try: self.config["id_column"] except KeyError: print("Error: id_column not specified in config file") sys.exit(1) try: self.config["select_columns"] except KeyError: print("Error: select_columns not specified in config file") sys.exit(1) try: self.config["speaker_groups"] except KeyError: print("Error: speaker_groups not specified in config file") sys.exit(1) if not isinstance(self.config["select_columns"], list): raise ValueError("Select Columns in config file must be a list") if not all(isinstance(el, list) for el in self.config["speaker_groups"]): raise ValueError("Speaker Groups in config file must be a list of lists") speaker_group_list = [speaker_group for group_sublist in self.config["speaker_groups"] for speaker_group in group_sublist] speaker_group_list = np.unique(speaker_group_list) for speaker_group in speaker_group_list: try: self.config["select_columns"].index(speaker_group) except ValueError: print("Error: " + speaker_group + " not found in select_columns as specified in config file") sys.exit(1) # check if dcf costs PTarget is between 0 and 1 for dcf_costs in self.config["dcf_costs"]: if (dcf_costs[0] <= 0.0) | (dcf_costs[0] >= 1.0): raise Exception("PTarget in DCF Costs needs to be between 0 and 1") # check scores_input try: list(scores_input.columns).index(self.config["reference_filepath_column"]) except ValueError: print("Error: reference filepath column '" + self.config["reference_filepath_column"] + "' as specified in config file not found in scores file") sys.exit(1) try: list(scores_input.columns).index(self.config["test_filepath_column"]) except ValueError: print("Error: test filepath column '" + self.config["test_filepath_column"] + "' as specified in config file not found in scores file") sys.exit(1) try: list(scores_input.columns).index(self.config["label_column"]) except ValueError: print("Error: label column '" + self.config["label_column"] + "' as specified in config file not found in scores file") sys.exit(1) try: list(scores_input.columns).index(self.config["scores_column"]) except ValueError: print("Error: scores column '" + self.config["scores_column"] + "' as specified in config file not found in scores file") sys.exit(1) # check metadata_input try: list(speaker_metadata_input.columns).index(self.config["id_column"]) except ValueError: print("Error: id column '" + self.config["id_column"] + "' as specified in config file not found in metadata file") sys.exit(1) for select_column in self.config["select_columns"]: try: list(speaker_metadata_input.columns).index(select_column) except ValueError: print("Error: '" + select_column + "' in select_columns as specified in config file not found in metadata file") sys.exit(1) return
[docs] def run_tests(self): """ Main method of the SpeakerBiasTest class which performs bias evaluation and tests. This function calls :py:func:`evaluate.evaluate_scores` from :py:mod:`evaluate.py` for the overall dataset. Later subgroups are constructed using :py:func:`groups.split_scores_by_speaker_groups` from :py:mod:`groups.py`. These subgroup scores are again evaluated using :py:func:`evaluate.evaluate_scores`. Lastly metric ratios are computed calling :py:func:`metrics.compute_metrics_ratios` from :py:mod:`metrics.py`. :returns: biastest_results_file to the results directory as specified in config.yaml, the name of the file contains the config filename and the scores filename. If a scores dataframe was provided instead of a scores filename the results file contains the date and time of the evaluation :rtype: csv_file """ print("Running bias test on scores") # Calculate average metrics fprs, fnrs, thresholds, metric_scores, metric_thresholds = evaluate_scores(self.scores['score'], self.scores['label'], self.config['dcf_costs']) self.error_rates_by_speaker_group.update({"average": pd.DataFrame({'FPRS': fprs, 'FNRS': fnrs, 'Thresholds': thresholds})}) # add string to prepare for SpeakerGroup row self.metrics['thresholds'] = ["thresholds"] + metric_thresholds self.metrics['average'] = ["average"] + metric_scores # for metrics first row is EER, after that follow order of self.config.dcf_costs # Calculate metrics for each group self.scores_by_speaker_groups = split_scores_by_speaker_groups(self.scores, self.speaker_metadata, self.config['speaker_groups'], id_delimiter=self.id_delimiter) for group in self.scores_by_speaker_groups: for subgroup in self.scores_by_speaker_groups[group]: label_score_list = self.scores_by_speaker_groups[group][subgroup] labels, scores = zip(*label_score_list) if all(np.isnan(labels)) or all(np.isnan(scores)): fprs = [] fnrs = [] thresholds = [] metric_scores = np.empty((len(self.config["dcf_costs"]) + 1)) metric_scores[:] = np.nan metric_scores = metric_scores.tolist() else: fprs, fnrs, thresholds, metric_scores = evaluate_scores(scores, labels, self.config['dcf_costs'], threshold_values=self.metrics['thresholds']) # if group in keys add to existing DataFrame otherwise create new key if group in self.error_rates_by_speaker_group.keys(): self.error_rates_by_speaker_group[group] = pd.concat([self.error_rates_by_speaker_group[group], pd.DataFrame({'Subgroup': subgroup, 'FPRS': fprs, 'FNRS': fnrs, 'Thresholds': thresholds})]) else: self.error_rates_by_speaker_group.update({group: pd.DataFrame({'Subgroup': subgroup, 'FPRS': fprs, 'FNRS': fnrs, 'Thresholds': thresholds})}) # for metrics first row is eer, after that follow order of self.config.dcf_costs #self.metrics[subgroup] = [group] + metric_scores -> use concat to avoid performance issues self.metrics = pd.concat([self.metrics, pd.Series([group] + metric_scores).rename(subgroup)], axis=1) # format metrics and metrics ratios metrics_ratios = compute_metrics_ratios(self.metrics).T metrics_ratios.columns = ["speaker_groups", "EER ratio"] + ["DCF ratio " + str(cost) for cost in self.config["dcf_costs"]] metrics_out = self.metrics.T metrics_out.columns = ["speaker_groups", "EER"] + ["DCF " + str(cost) for cost in self.config["dcf_costs"]] output = metrics_out.rename_axis('group_name').reset_index().merge(metrics_ratios.rename_axis('group_name').reset_index()) # write metrics and metrics ratios to biastest results file write_data(output, os.path.join(self.config["results_dir"], self._biastest_results_file)) # calculate a bias test score: function in metrics which takes output of compute_metrics_ratios print("Bias test finished. Results saved to " + self.config["results_dir"]+self._biastest_results_file) return
[docs] def evaluate_dataset(self): # TODO: implement method evaluate_scores_by_speaker_groups(self.scores_by_speaker_groups, self._dataset_eval_log_file) return