import json import os import pandas as pd import numpy as np from src.display.formatting import make_clickable_model from src.display.utils import EvalQueueColumn from src.about import Tasks, SingleTableTasks, SingleColumnTasks # Model name mapping dictionary model_names = { 'CLAVADDPM': "ClavaDDPM", 'RGCLD': "RGCLD", 'MOSTLYAI': "TabularARGN", 'RCTGAN': "RCTGAN", 'REALTABFORMER': "REaLTabFormer", 'SDV': "SDV", } # Dataset name mapping dictionary dataset_names = { "airbnb-simplified_subsampled": "Airbnb", "Berka_subsampled": "Berka", "Biodegradability_v1": "Biodegradability", "CORA_v1": "Cora", "imdb_MovieLens_v1": "IMDB", "rossmann_subsampled": "Rossmann", "walmart_subsampled": "Walmart", "f1_subsampled": "F1", } # def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: # """Creates a dataframe from all the individual experiment results""" # raw_data = get_raw_eval_results(results_path, requests_path) # all_data_json = [v.to_dict() for v in raw_data] # df = pd.DataFrame.from_records(all_data_json) # df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) # df = df[cols].round(decimals=2) # # filter out if any of the benchmarks have not been produced # df = df[has_no_nan_values(df, benchmark_cols)] # return df def strip_emoji(text: str) -> str: """Removes emojis from text""" return text.encode("ascii", "ignore").decode("ascii").rstrip() def get_leaderboard_df(results_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: """Creates a dataframe from all the individual experiment results""" # iterate thorugh all files in the results path and read them into json all_data_json = [] res_path = os.path.join(results_path, "demo-leaderboard", "syntherela-demo") for entry in os.listdir(res_path): if entry.endswith(".json"): file_path = os.path.join(res_path, entry) with open(file_path) as fp: data = json.load(fp) all_data_json.append(data) multi_table_metrics = [task.value.metric for task in Tasks] single_table_metrics = [task.value.metric for task in SingleTableTasks] single_column_metrics = [task.value.metric for task in SingleColumnTasks] multi_table_metric_names = [task.value.col_name for task in Tasks] single_table_metric_names = [task.value.col_name for task in SingleTableTasks] single_column_metric_names = [task.value.col_name for task in SingleColumnTasks] # Create mapping between metrics and their display names multi_table_metric_mapping = dict(zip(multi_table_metrics, multi_table_metric_names)) single_table_metric_mapping = dict(zip(single_table_metrics, single_table_metric_names)) single_column_metric_mapping = dict(zip(single_column_metrics, single_column_metric_names)) # create empty dataframe with the display column names multitable_df = pd.DataFrame(columns=["Dataset", "Model"] + multi_table_metric_names) singletable_df = pd.DataFrame(columns=["Dataset", "Model"] + single_table_metric_names) singlecolumn_df = pd.DataFrame(columns=["Dataset", "Table", "Model"] + single_column_metric_names) # iterate through all json files and add the data to the dataframe for data in all_data_json: model = data["method_name"] # Rename model if it exists in the mapping dictionary if model.upper() in model_names: model = model_names[model.upper()] dataset = data["dataset_name"] # Rename dataset if it exists in the mapping dictionary if dataset in dataset_names: dataset = dataset_names[dataset] row = {"Dataset": dataset, "Model": model} for metric in multi_table_metrics: stripped_metric = strip_emoji(metric) display_name = multi_table_metric_mapping[metric] # Get the display name for this metric # Special case for CardinalityShapeSimilarity which is stored under "Trends" if "CardinalityShapeSimilarity" in metric: if "Trends" in data["multi_table_metrics"] and "cardinality" in data["multi_table_metrics"]["Trends"]: row[display_name] = data["multi_table_metrics"]["Trends"]["cardinality"] else: row[display_name] = np.nan continue if stripped_metric in data["multi_table_metrics"]: metric_values = [] for table in data["multi_table_metrics"][stripped_metric].keys(): if "accuracy" in data["multi_table_metrics"][stripped_metric][table]: metric_values.append(data["multi_table_metrics"][stripped_metric][table]["accuracy"]) if "statistic" in data["multi_table_metrics"][stripped_metric][table]: metric_values.append(data["multi_table_metrics"][stripped_metric][table]["statistic"]) row[display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column else: row[display_name] = np.nan # Use display name as column multitable_df = pd.concat([multitable_df, pd.DataFrame([row])], ignore_index=True) singletable_row = {"Dataset": dataset, "Model": model} for metric in single_table_metrics: stripped_metric = strip_emoji(metric) display_name = single_table_metric_mapping[metric] # Get the display name for this metric if stripped_metric in data["single_table_metrics"]: metric_values = [] for table in data["single_table_metrics"][stripped_metric].keys(): if "accuracy" in data["single_table_metrics"][stripped_metric][table]: metric_values.append(data["single_table_metrics"][stripped_metric][table]["accuracy"]) if "value" in data["single_table_metrics"][stripped_metric][table]: metric_values.append(data["single_table_metrics"][stripped_metric][table]["value"]) singletable_row[display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column else: singletable_row[display_name] = np.nan # Use display name as column singletable_df = pd.concat([singletable_df, pd.DataFrame([singletable_row])], ignore_index=True) singlecolumn_row = {"Dataset": dataset, "Model": model, "Table": ""} # insert row for metric in single_column_metrics: stripped_metric = strip_emoji(metric) display_name = single_column_metric_mapping[metric] # Get the display name for this metric if stripped_metric in data["single_column_metrics"]: for table in data["single_column_metrics"][stripped_metric].keys(): # check if row where dataset = dataset, model = model, table = table exists if singlecolumn_df[ (singlecolumn_df["Dataset"] == dataset) & (singlecolumn_df["Model"] == model) & (singlecolumn_df["Table"] == table) ].empty: singlecolumn_row = {"Dataset": dataset, "Model": model, "Table": table} singlecolumn_df = pd.concat([singlecolumn_df, pd.DataFrame([singlecolumn_row])], ignore_index=True) metric_values = [] for column in data["single_column_metrics"][stripped_metric][table].keys(): if "accuracy" in data["single_column_metrics"][stripped_metric][table][column]: metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["accuracy"]) if "value" in data["single_column_metrics"][stripped_metric][table][column]: metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["value"]) if "statistic" in data["single_column_metrics"][stripped_metric][table][column]: metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["statistic"]) # save np.mean(metric_values).round(decimals=2) to singlecolumn_df where dataset = dataset, model = model, table = table singlecolumn_df.loc[ (singlecolumn_df["Dataset"] == dataset) & (singlecolumn_df["Model"] == model) & (singlecolumn_df["Table"] == table), display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column return singlecolumn_df, singletable_df, multitable_df