Skip to content

Api reference

Consistency evaluation

Streamlit labelling app

Small tool for labeling data.

pip install streamlit streamlit run labellingapp.py

Expects the metadata csv and the topic csv in the data directory.

update_this_relevancy(var_, topic_)

Helper function to bind the variables to scope.

Source code in tools/labellingapp.py
113
114
115
def update_this_relevancy(var_, topic_):
    """Helper function to bind the variables to scope."""
    return lambda: update_relevancy(f"q{var_}", topic_)

Merging labels

merge_labels()

Description : Merge labels from multiple JSON label files into a single dictionary.

Source code in tools/merge_labels.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def merge_labels() -> dict[Any, list]:
    """
    Description : Merge labels from multiple JSON label files into a single dictionary.
    """
    # Read all files and merge them into a single dictionary
    merged_labels = defaultdict(set)
    for file in all_files:
        with open(file, "r") as f:
            try:
                data = json.load(f)
                for key, values in data.items():
                    merged_labels[key].update(values)
            except json.JSONDecodeError:
                print(f"Error reading {file}")
    # Remove empty lists
    merged_labels = {k: list(v) for k, v in merged_labels.items() if v}

    # Reverse the dictionary so we have topic -> [dataset_ids]
    reversed_labels = defaultdict(set)
    for key, values in merged_labels.items():
        for value in values:
            reversed_labels[value].add(key)

    # Convert sets to lists for each value
    return {k: list(v) for k, v in reversed_labels.items()}

Run Batch Training

ExperimentRunner

Description: This class is used to run all the experiments. If you want to modify any behavior, change the functions in this class according to what you want. You may also want to check out ResponseParser.

Source code in evaluation/training_utils.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
class ExperimentRunner:
    """
    Description: This class is used to run all the experiments. If you want to modify any behavior, change the functions in this class according to what you want.
    You may also want to check out ResponseParser.
    """

    def __init__(
        self,
        config,
        eval_path,
        queries,
        list_of_embedding_models,
        list_of_llm_models,
        types_of_llm_apply=None,
        subset_ids=None,
        use_cached_experiment=False,
        custom_name=None,
    ):
        if types_of_llm_apply is None:
            types_of_llm_apply = [True, False, None]
        self.config = config
        self.eval_path = eval_path
        self.queries = queries
        self.list_of_embedding_models = list_of_embedding_models
        self.list_of_llm_models = list_of_llm_models
        self.subset_ids = subset_ids
        self.use_cached_experiment = use_cached_experiment
        self.custom_name = custom_name
        self.types_of_llm_apply = types_of_llm_apply

    def run_experiments(self):
        # across all embedding models
        for embedding_model in tqdm(
            self.list_of_embedding_models,
            desc="Embedding Models",
        ):
            main_experiment_directory = (
                self.eval_path / f"{process_embedding_model_name_hf(embedding_model)}"
            )

            os.makedirs(main_experiment_directory, exist_ok=True)

            # update the config with the new experiment directories
            self.config["data_dir"] = str(main_experiment_directory)
            self.config["persist_dir"] = str(main_experiment_directory / "chroma_db")

            # save training details and config in a dataframe
            config_df = pd.DataFrame.from_dict(
                self.config, orient="index"
            ).reset_index()
            config_df.columns = ["Hyperparameter", "Value"]

            # load the persistent database using ChromaDB
            client = chromadb.PersistentClient(path=self.config["persist_dir"])

            # Note : I was not sure how to move this to the next loop, we need the QA setup going forward..
            # Check if the chroma db as well as metadata files exist.
            if os.path.exists(self.config["persist_dir"]) and os.path.exists(
                main_experiment_directory / "all_dataset_description.csv"
            ):
                # load the qa from the persistent database if it exists. Disabling training does this for us.
                self.config["training"] = False

                qa_dataset_handler = QASetup(
                    config=self.config,
                    data_type=self.config["type_of_data"],
                    client=client,
                    subset_ids=self.subset_ids,
                )

                qa_dataset, _ = qa_dataset_handler.setup_vector_db_and_qa()
                self.config["training"] = True
            else:
                self.config["training"] = True
                qa_dataset_handler = QASetup(
                    config=self.config,
                    data_type=self.config["type_of_data"],
                    client=client,
                    subset_ids=self.subset_ids,
                )

                qa_dataset, _ = qa_dataset_handler.setup_vector_db_and_qa()

            # across all llm models
            for llm_model in tqdm(self.list_of_llm_models, desc="LLM Models"):
                # update the config with the new embedding and llm models
                self.config["embedding_model"] = embedding_model
                self.config["llm_model"] = llm_model

                # create a new experiment directory using a combination of the embedding model and llm model names
                experiment_name = f"{process_embedding_model_name_hf(embedding_model)}_{process_llm_model_name_ollama(llm_model)}"
                if self.custom_name is not None:
                    experiment_path = (
                        # main_experiment_directory / (self.custom_name + experiment_name)
                        main_experiment_directory
                        / f"{self.custom_name}@{experiment_name}"
                    )
                else:
                    experiment_path = main_experiment_directory / experiment_name
                os.makedirs(experiment_path, exist_ok=True)
                config_df.to_csv(experiment_path / "config.csv", index=False)

                # we do not want to run the models again for no reason. So we use existing caches if they exit.
                if self.use_cached_experiment and os.path.exists(
                    experiment_path / "results.csv"
                ):
                    print(
                        f"Experiment {experiment_name} already exists. Skipping... To disable this behavior, set use_cached_experiment = False"
                    )
                    continue
                else:
                    data_metadata_path = (
                        Path(self.config["data_dir"]) / "all_dataset_description.csv"
                    )
                    data_metadata = pd.read_csv(data_metadata_path)

                    combined_df = self.aggregate_multiple_queries(
                        qa_dataset=qa_dataset,
                        data_metadata=data_metadata,
                        types_of_llm_apply=self.types_of_llm_apply,
                    )

                    combined_df.to_csv(experiment_path / "results.csv")

    def aggregate_multiple_queries(self, qa_dataset, data_metadata, types_of_llm_apply):
        """
        Description: Aggregate the results of multiple queries into a single dataframe and count the number of times a dataset appears in the results. This was done here and not in evaluate to make it a little easier to manage as each of them requires a different chroma_db and config
        """

        combined_results = pd.DataFrame()

        # Initialize the ResponseParser once per query type
        response_parsers = {
            apply_llm: ResponseParser(
                query_type=self.config["type_of_data"], apply_llm_before_rag=apply_llm
            )
            for apply_llm in types_of_llm_apply
        }

        for query in tqdm(self.queries, total=len(self.queries), leave=True):
            for apply_llm_before_rag in types_of_llm_apply:
                combined_results = self.run_query(
                    apply_llm_before_rag,
                    combined_results,
                    data_metadata,
                    qa_dataset,
                    query,
                    response_parsers,
                )

        # Concatenate all collected DataFrames at once
        # combined_df = pd.concat(combined_results, ignore_index=True)

        return combined_results

    def run_query(
        self,
        apply_llm_before_rag,
        combined_results,
        data_metadata,
        qa_dataset,
        query,
        response_parsers,
    ):
        response_parser = response_parsers[apply_llm_before_rag]
        result_data_frame, _ = QueryProcessor(
            query=query,
            qa=qa_dataset,
            type_of_query="dataset",
            config=self.config,
        ).get_result_from_query()
        response_parser.rag_response = {
            "initial_response": result_data_frame["id"].to_list()
        }
        response_parser.fetch_llm_response(query)
        result_data_frame = response_parser.parse_and_update_response(
            data_metadata
        ).copy()[["did", "name"]]
        result_data_frame["query"] = query
        result_data_frame["llm_model"] = self.config["llm_model"]
        result_data_frame["embedding_model"] = self.config["embedding_model"]
        result_data_frame["llm_before_rag"] = apply_llm_before_rag
        combined_results = pd.concat(
            [combined_results, result_data_frame], ignore_index=True
        )
        return combined_results

aggregate_multiple_queries(qa_dataset, data_metadata, types_of_llm_apply)

Description: Aggregate the results of multiple queries into a single dataframe and count the number of times a dataset appears in the results. This was done here and not in evaluate to make it a little easier to manage as each of them requires a different chroma_db and config

Source code in evaluation/training_utils.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def aggregate_multiple_queries(self, qa_dataset, data_metadata, types_of_llm_apply):
    """
    Description: Aggregate the results of multiple queries into a single dataframe and count the number of times a dataset appears in the results. This was done here and not in evaluate to make it a little easier to manage as each of them requires a different chroma_db and config
    """

    combined_results = pd.DataFrame()

    # Initialize the ResponseParser once per query type
    response_parsers = {
        apply_llm: ResponseParser(
            query_type=self.config["type_of_data"], apply_llm_before_rag=apply_llm
        )
        for apply_llm in types_of_llm_apply
    }

    for query in tqdm(self.queries, total=len(self.queries), leave=True):
        for apply_llm_before_rag in types_of_llm_apply:
            combined_results = self.run_query(
                apply_llm_before_rag,
                combined_results,
                data_metadata,
                qa_dataset,
                query,
                response_parsers,
            )

    # Concatenate all collected DataFrames at once
    # combined_df = pd.concat(combined_results, ignore_index=True)

    return combined_results

ResponseParser

Bases: ResponseParser

Source code in evaluation/training_utils.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class ResponseParser(ResponseParser):
    def load_paths(self):
        """
        Description: Load paths from paths.json
        """
        with open("../frontend/paths.json", "r") as file:
            return json.load(file)

    def parse_and_update_response(self, metadata):
        """
        Description: Parse the response from the RAG and LLM services and update the metadata based on the response
        """
        if self.rag_response is not None and self.llm_response is not None:
            if not self.apply_llm_before_rag:
                filtered_metadata = metadata[
                    metadata["did"].isin(self.rag_response["initial_response"])
                ]
                llm_parser = LLMResponseParser(self.llm_response)
                llm_parser.subset_cols = ["did", "name"]

                if self.query_type.lower() == "dataset":
                    llm_parser.get_attributes_from_response()
                    return llm_parser.update_subset_cols(filtered_metadata)

            elif self.apply_llm_before_rag:
                llm_parser = LLMResponseParser(self.llm_response)
                llm_parser.subset_cols = ["did", "name"]
                llm_parser.get_attributes_from_response()
                filtered_metadata = llm_parser.update_subset_cols(metadata)

                return filtered_metadata[
                    filtered_metadata["did"].isin(self.rag_response["initial_response"])
                ]

            elif self.apply_llm_before_rag is None:
                # if no llm response is required, return the initial response
                return metadata
        elif (
            self.rag_response is not None and self.structured_query_response is not None
        ):
            return metadata[["did", "name"]]
        else:
            return metadata

load_paths()

Description: Load paths from paths.json

Source code in evaluation/training_utils.py
78
79
80
81
82
83
def load_paths(self):
    """
    Description: Load paths from paths.json
    """
    with open("../frontend/paths.json", "r") as file:
        return json.load(file)

parse_and_update_response(metadata)

Description: Parse the response from the RAG and LLM services and update the metadata based on the response

Source code in evaluation/training_utils.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def parse_and_update_response(self, metadata):
    """
    Description: Parse the response from the RAG and LLM services and update the metadata based on the response
    """
    if self.rag_response is not None and self.llm_response is not None:
        if not self.apply_llm_before_rag:
            filtered_metadata = metadata[
                metadata["did"].isin(self.rag_response["initial_response"])
            ]
            llm_parser = LLMResponseParser(self.llm_response)
            llm_parser.subset_cols = ["did", "name"]

            if self.query_type.lower() == "dataset":
                llm_parser.get_attributes_from_response()
                return llm_parser.update_subset_cols(filtered_metadata)

        elif self.apply_llm_before_rag:
            llm_parser = LLMResponseParser(self.llm_response)
            llm_parser.subset_cols = ["did", "name"]
            llm_parser.get_attributes_from_response()
            filtered_metadata = llm_parser.update_subset_cols(metadata)

            return filtered_metadata[
                filtered_metadata["did"].isin(self.rag_response["initial_response"])
            ]

        elif self.apply_llm_before_rag is None:
            # if no llm response is required, return the initial response
            return metadata
    elif (
        self.rag_response is not None and self.structured_query_response is not None
    ):
        return metadata[["did", "name"]]
    else:
        return metadata

exp_0(process_query_elastic_search, eval_path, query_key_dict)

EXPERIMENT 0 Get results from elastic search

Source code in evaluation/experiments.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def exp_0(process_query_elastic_search, eval_path, query_key_dict):
    """
    EXPERIMENT 0
    Get results from elastic search
    """
    # cols = ,did,name,query,llm_model,embedding_model,llm_before_rag
    # for every query, get the results from elastic search
    if not os.path.exists(eval_path / "elasticsearch" / "elasticsearch"):
        os.makedirs(eval_path / "elasticsearch" / "elasticsearch")
    output_file_path = eval_path / "elasticsearch" / "elasticsearch" / "results.csv"
    # check if the file exists and skip
    if os.path.exists(output_file_path) == False:
        with open(output_file_path, "w") as f:
            f.write("did,name,query,llm_model,embedding_model,llm_before_rag\n")

            # Use ThreadPoolExecutor to parallelize requests
            with ThreadPoolExecutor(max_workers=10) as executor:
                # Start a future for each query
                futures = {
                    executor.submit(
                        process_query_elastic_search, query, dataset_id
                    ): query
                    for query, dataset_id in query_key_dict.items()
                }

                for future in tqdm(as_completed(futures), total=len(futures)):
                    result = future.result()
                    # Save the results to a CSV file
                    for id, query in result:
                        f.write(f"{id},None,{query},es,es,None\n")

exp_1(eval_path, config, list_of_embedding_models, list_of_llm_models, subset_ids, query_key_dict)

EXPERIMENT 1 Main evaluation loop that is used to run the base experiments using different models and embeddings. Takes into account the following: original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing list_of_embedding_models = [ "BAAI/bge-large-en-v1.5", "BAAI/bge-base-en-v1.5", "Snowflake/snowflake-arctic-embed-l", ] list_of_llm_models = ["llama3", "phi3"] types_of_llm_apply : llm applied as filter before the RAG pipeline, llm applied as reranker after the RAG pipeline, llm not used at all

Source code in evaluation/experiments.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def exp_1(
    eval_path,
    config,
    list_of_embedding_models,
    list_of_llm_models,
    subset_ids,
    query_key_dict,
):
    """
    EXPERIMENT 1
    Main evaluation loop that is used to run the base experiments using different models and embeddings.
    Takes into account the following:
    original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing
    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
        "BAAI/bge-base-en-v1.5",
        "Snowflake/snowflake-arctic-embed-l",
    ]
    list_of_llm_models = ["llama3", "phi3"]
    types_of_llm_apply : llm applied as filter before the RAG pipeline, llm applied as reranker after the RAG pipeline, llm not used at all
    """

    expRunner = ExperimentRunner(
        config=config,
        eval_path=eval_path,
        queries=query_key_dict.keys(),
        list_of_embedding_models=list_of_embedding_models,
        list_of_llm_models=list_of_llm_models,
        subset_ids=subset_ids,
        use_cached_experiment=True,
    )
    expRunner.run_experiments()

exp_2(eval_path, config, subset_ids, query_key_dict)

EXPERIMENT 2 Evaluating temperature = 1 (default was 0.95) Takes into account the following: original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing list_of_embedding_models = [ "BAAI/bge-large-en-v1.5", ] list_of_llm_models = ["llama3"] types_of_llm_apply : llm applied as filter before the RAG pipeline, llm applied as reranker after the RAG pipeline, llm not used at all

Source code in evaluation/experiments.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def exp_2(eval_path, config, subset_ids, query_key_dict):
    """
    EXPERIMENT 2
    Evaluating temperature = 1 (default was 0.95)
    Takes into account the following:
    original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing
    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
    ]
    list_of_llm_models = ["llama3"]
    types_of_llm_apply : llm applied as filter before the RAG pipeline, llm applied as reranker after the RAG pipeline, llm not used at all
    """

    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
    ]
    list_of_llm_models = ["llama3"]
    config["temperature"] = 1

    expRunner = ExperimentRunner(
        config=config,
        eval_path=eval_path,
        queries=query_key_dict.keys(),
        list_of_embedding_models=list_of_embedding_models,
        list_of_llm_models=list_of_llm_models,
        subset_ids=subset_ids,
        use_cached_experiment=True,
        custom_name="temperature_1",
    )
    expRunner.run_experiments()

    # reset the temperature to the default value
    config["temperature"] = 0.95

exp_3(eval_path, config, subset_ids, query_key_dict)

EXPERIMENT 3 Evaluating search type [mmr, similarity_score_threshold] (default was similarity) Takes into account the following: original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing list_of_embedding_models = [ "BAAI/bge-large-en-v1.5", ] list_of_llm_models = ["llama3"] types_of_llm_apply : llm applied as reranker after the RAG pipeline

Source code in evaluation/experiments.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def exp_3(eval_path, config, subset_ids, query_key_dict):
    """
    EXPERIMENT 3
    Evaluating search type [mmr, similarity_score_threshold] (default was similarity)
    Takes into account the following:
    original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing
    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
    ]
    list_of_llm_models = ["llama3"]
    types_of_llm_apply : llm applied as reranker after the RAG pipeline
    """

    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
    ]
    list_of_llm_models = ["llama3"]
    types_of_llm_apply = [False]
    types_of_search = ["mmr", "similarity_score_threshold"]

    for type_of_search in types_of_search:
        config["search_type"] = type_of_search
        expRunner = ExperimentRunner(
            config=config,
            eval_path=eval_path,
            queries=query_key_dict.keys(),
            list_of_embedding_models=list_of_embedding_models,
            list_of_llm_models=list_of_llm_models,
            subset_ids=subset_ids,
            use_cached_experiment=True,
            custom_name=f"{type_of_search}_search",
            types_of_llm_apply=types_of_llm_apply,
        )
        expRunner.run_experiments()

    # reset the search type to the default value
    config["search_type"] = "similarity"

exp_4(eval_path, config, subset_ids, query_key_dict)

EXPERIMENT 4 Evaluating chunk size. The default is 1000, trying out 512,128 Takes into account the following: original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing list_of_embedding_models = [ "BAAI/bge-large-en-v1.5", ] list_of_llm_models = ["llama3"] types_of_llm_apply : llm applied as reranker after the RAG pipeline

Source code in evaluation/experiments.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def exp_4(eval_path, config, subset_ids, query_key_dict):
    """
    EXPERIMENT 4
    Evaluating chunk size. The default is 1000, trying out 512,128
    Takes into account the following:
    original data ingestion pipeline : combine a string of all metadata fields and the dataset description and embeds them with no pre-processing
    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
    ]
    list_of_llm_models = ["llama3"]
    types_of_llm_apply : llm applied as reranker after the RAG pipeline
    """

    list_of_embedding_models = [
        "BAAI/bge-large-en-v1.5",
    ]
    list_of_llm_models = ["llama3"]
    types_of_llm_apply = [False]
    types_of_chunk = [512, 128]
    for type_of_chunk in types_of_chunk:
        config["chunk_size"] = type_of_chunk
        expRunner = ExperimentRunner(
            config=config,
            eval_path=eval_path,
            queries=query_key_dict.keys(),
            list_of_embedding_models=list_of_embedding_models,
            list_of_llm_models=list_of_llm_models,
            subset_ids=subset_ids,
            use_cached_experiment=True,
            custom_name=f"{type_of_chunk}_chunk",
            types_of_llm_apply=types_of_llm_apply,
        )
        expRunner.run_experiments()

    # reset the search type to the default value
    config["chunk_size"] = 1000

get_queries(query_templates, load_eval_queries)

Get queries from the dataset templates and format it

Source code in evaluation/training_utils.py
318
319
320
321
322
323
324
325
326
327
328
def get_queries(query_templates, load_eval_queries):
    """
    Get queries from the dataset templates and format it
    """
    query_key_dict = {}
    for template in query_templates:
        for row in load_eval_queries.itertuples():
            new_query = f"{template} {row[1]}".strip()
            if new_query not in query_key_dict:
                query_key_dict[new_query.strip()] = row[2]
    return query_key_dict

ollama_setup(list_of_llm_models)

Description: Setup Ollama server and pull the llm_model that is being used

Source code in evaluation/training_utils.py
63
64
65
66
67
68
69
70
71
72
73
def ollama_setup(list_of_llm_models: list):
    """
    Description: Setup Ollama server and pull the llm_model that is being used
    """
    os.system("ollama serve&")
    print("Waiting for Ollama server to be active...")
    while os.system("ollama list | grep 'NAME'") == "":
        pass

    for llm_model in list_of_llm_models:
        os.system(f"ollama pull {llm_model}")

process_embedding_model_name_hf(name)

Description: This function processes the name of the embedding model from Hugging Face to use as experiment name.

Input: name (str) - name of the embedding model from Hugging Face.

Returns: name (str) - processed name of the embedding model.

Source code in evaluation/training_utils.py
31
32
33
34
35
36
37
38
39
def process_embedding_model_name_hf(name: str) -> str:
    """
    Description: This function processes the name of the embedding model from Hugging Face to use as experiment name.

    Input: name (str) - name of the embedding model from Hugging Face.

    Returns: name (str) - processed name of the embedding model.
    """
    return name.replace("/", "_")

process_llm_model_name_ollama(name)

Description: This function processes the name of the llm model from Ollama to use as experiment name.

Input: name (str) - name of the llm model from Ollama.

Returns: name (str) - processed name of the llm model.

Source code in evaluation/training_utils.py
42
43
44
45
46
47
48
49
50
def process_llm_model_name_ollama(name: str) -> str:
    """
    Description: This function processes the name of the llm model from Ollama to use as experiment name.

    Input: name (str) - name of the llm model from Ollama.

    Returns: name (str) - processed name of the llm model.
    """
    return name.replace(":", "_")

Get the results from elastic search opemml server

Source code in evaluation/training_utils.py
331
332
333
334
335
336
337
def process_query_elastic_search(query, dataset_id):
    """
    Get the results from elastic search opemml server
    """
    res = get_elastic_search_results(query)
    ids = [val["_id"] for val in res]
    return [(id, query) for id in ids]

Evaluation Utils

EvaluationProcessor

Description: Process all the evaluated results, add the required metrics and save results as a csv/generate plots

Source code in evaluation/evaluation_utils.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class EvaluationProcessor:
    """
    Description: Process all the evaluated results, add the required metrics and save results as a csv/generate plots
    """

    def __init__(self, eval_path, metrics=None, sort_by="precision"):
        if metrics is None:
            metrics = ["precision", "recall", "map"]
        self.eval_path = eval_path
        self.load_eval_queries = self.load_queries_from_csv()
        self.query_templates = self.load_query_templates()
        self.query_key_dict = self.create_query_key_dict()
        self.metrics = metrics
        self.sort_by = sort_by

        # Define a dictionary to map metric names to their corresponding methods
        self.metric_methods = {
            "precision": self.add_precision,
            "recall": self.add_recall,
            "map": self.add_map,
        }

    def run(self):
        """
        Description: Load files, Run the evaluation process and display the results

        """
        csv_files = self.load_result_files()
        results_df = self.generate_results(csv_files)
        results_display = self.display_results(results_df)
        return results_display

    def load_result_files(self):
        """
        Description: Find all the csv files in the evaluation directory.

        """
        return glob.glob(str(self.eval_path / "*/*/results.csv"))

    def generate_results(self, csv_files):
        """
        Description: Load the results from the csv files, group them and compute metrics for each group. Then merge the results and sort them by the metric specified.
        """
        merged_df = pd.DataFrame()

        for exp_path in tqdm(csv_files):
            exp = pd.read_csv(exp_path).rename(columns={"did": "y_pred"})
            exp["exp_folder_name"] = Path(exp_path).parent.name
            exp["custom_experiment"] = ""
            # split exp_folder_name by @ to get extra information
            exp["custom_experiment"] = exp["exp_folder_name"].apply(
                lambda x: x.split("@")[0] if "@" in x else ""
            )
            exp.drop("exp_folder_name", axis=1, inplace=True)
            exp = self.preprocess_results(exp)

            grouped_results_for_y_true_and_pred = exp.groupby(
                [
                    "embedding_model",
                    "llm_model",
                    "query",
                    "llm_before_rag",
                    "custom_experiment",
                ]
            ).agg({"y_true": ",".join, "y_pred": ",".join})

            grouped_results_for_y_true_and_pred = self.add_metrics(
                grouped_results_for_y_true_and_pred
            )

            # aggregate by computing the average of the metrics for each group
            grouped_results_for_y_true_and_pred = (
                grouped_results_for_y_true_and_pred.groupby(
                    [
                        "embedding_model",
                        "llm_model",
                        "llm_before_rag",
                        "custom_experiment",
                    ]
                ).agg({metric: "mean" for metric in self.metrics})
            )

            # merge with the results
            merged_df = pd.concat([merged_df, grouped_results_for_y_true_and_pred])

            # sort by metric
            if self.sort_by in self.metrics:
                merged_df = merged_df.sort_values(by=self.sort_by, ascending=False)
        return merged_df

    def add_metrics(self, grouped_results_for_y_true_and_pred):
        # Iterate over the metrics and apply the corresponding method if it exists
        for metric in self.metrics:
            if metric in self.metric_methods:
                grouped_results_for_y_true_and_pred = self.metric_methods[metric](
                    grouped_results_for_y_true_and_pred
                )

        return grouped_results_for_y_true_and_pred

    def load_queries_from_csv(self):
        """
        Description: Load the queries from the csv file

        """
        return pd.read_csv(self.eval_path / "merged_labels.csv")[
            ["Topics", "Dataset IDs"]
        ]

    def load_query_templates(self):
        """
        Description: Load the query templates from the txt file. This is used to generate the queries for the evaluation process. eg: {query_template} {query}
        {find me a dataset about} {cancer}
        """
        with open(self.eval_path / "query_templates.txt", "r") as f:
            query_templates = f.readlines()
        return [x.strip() for x in query_templates]

    def create_query_key_dict(self):
        """
        Description: Use the manual evaluation to create a dictionary of queries and their corresponding ground truth dataset ids. eg: Math,"45617,43383,2,45748"
        """
        query_key_dict = {}
        for template in self.query_templates:
            for row in self.load_eval_queries.itertuples():
                new_query = f"{template} {row[1]}".strip()
                if new_query not in query_key_dict:
                    query_key_dict[new_query.strip()] = row[2]
        return query_key_dict

    def preprocess_results(self, results_df):
        """
        Description: Preprocess the results dataframe by filling missing values and converting the columns to the correct data types.
        """
        results_df["llm_before_rag"] = results_df["llm_before_rag"].fillna(
            "No LLM filtering"
        )
        results_df["y_pred"] = results_df["y_pred"].astype(str)
        results_df["query"] = results_df["query"].str.strip()
        results_df["y_true"] = results_df["query"].map(self.query_key_dict)
        return results_df

    @staticmethod
    def add_precision(grouped_df):
        """
        Description: Compute the precision metric for each group in the dataframe
        """
        grouped_df["precision"] = [
            len(set(y_true).intersection(set(y_pred))) / len(set(y_pred))
            for y_true, y_pred in zip(grouped_df["y_true"], grouped_df["y_pred"])
        ]
        return grouped_df

    @staticmethod
    def add_recall(grouped_df):
        """
        Description: Compute the recall metric for each group in the dataframe

        """
        grouped_df["recall"] = [
            len(set(y_true).intersection(set(y_pred))) / len(set(y_true))
            for y_true, y_pred in zip(grouped_df["y_true"], grouped_df["y_pred"])
        ]
        return grouped_df

    @staticmethod
    def add_map(grouped_df):
        """
        Description: Compute the mean average precision metric for each group in the dataframe
        """
        grouped_df["map"] = [
            sum(
                [
                    len(set(y_true).intersection(set(y_pred[:i]))) / i
                    for i in range(1, len(set(y_pred)))
                ]
            )
            / len(set(y_true))
            for y_true, y_pred in zip(grouped_df["y_true"], grouped_df["y_pred"])
        ]
        return grouped_df

    @staticmethod
    def display_results(results_df):
        # add more preprocessing here
        results_df = pd.DataFrame(results_df)
        # heatmap results
        # return results_df.style.background_gradient(cmap='coolwarm', axis=0)
        return results_df

add_map(grouped_df) staticmethod

Description: Compute the mean average precision metric for each group in the dataframe

Source code in evaluation/evaluation_utils.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@staticmethod
def add_map(grouped_df):
    """
    Description: Compute the mean average precision metric for each group in the dataframe
    """
    grouped_df["map"] = [
        sum(
            [
                len(set(y_true).intersection(set(y_pred[:i]))) / i
                for i in range(1, len(set(y_pred)))
            ]
        )
        / len(set(y_true))
        for y_true, y_pred in zip(grouped_df["y_true"], grouped_df["y_pred"])
    ]
    return grouped_df

add_precision(grouped_df) staticmethod

Description: Compute the precision metric for each group in the dataframe

Source code in evaluation/evaluation_utils.py
150
151
152
153
154
155
156
157
158
159
@staticmethod
def add_precision(grouped_df):
    """
    Description: Compute the precision metric for each group in the dataframe
    """
    grouped_df["precision"] = [
        len(set(y_true).intersection(set(y_pred))) / len(set(y_pred))
        for y_true, y_pred in zip(grouped_df["y_true"], grouped_df["y_pred"])
    ]
    return grouped_df

add_recall(grouped_df) staticmethod

Description: Compute the recall metric for each group in the dataframe

Source code in evaluation/evaluation_utils.py
161
162
163
164
165
166
167
168
169
170
171
@staticmethod
def add_recall(grouped_df):
    """
    Description: Compute the recall metric for each group in the dataframe

    """
    grouped_df["recall"] = [
        len(set(y_true).intersection(set(y_pred))) / len(set(y_true))
        for y_true, y_pred in zip(grouped_df["y_true"], grouped_df["y_pred"])
    ]
    return grouped_df

create_query_key_dict()

Description: Use the manual evaluation to create a dictionary of queries and their corresponding ground truth dataset ids. eg: Math,"45617,43383,2,45748"

Source code in evaluation/evaluation_utils.py
126
127
128
129
130
131
132
133
134
135
136
def create_query_key_dict(self):
    """
    Description: Use the manual evaluation to create a dictionary of queries and their corresponding ground truth dataset ids. eg: Math,"45617,43383,2,45748"
    """
    query_key_dict = {}
    for template in self.query_templates:
        for row in self.load_eval_queries.itertuples():
            new_query = f"{template} {row[1]}".strip()
            if new_query not in query_key_dict:
                query_key_dict[new_query.strip()] = row[2]
    return query_key_dict

generate_results(csv_files)

Description: Load the results from the csv files, group them and compute metrics for each group. Then merge the results and sort them by the metric specified.

Source code in evaluation/evaluation_utils.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def generate_results(self, csv_files):
    """
    Description: Load the results from the csv files, group them and compute metrics for each group. Then merge the results and sort them by the metric specified.
    """
    merged_df = pd.DataFrame()

    for exp_path in tqdm(csv_files):
        exp = pd.read_csv(exp_path).rename(columns={"did": "y_pred"})
        exp["exp_folder_name"] = Path(exp_path).parent.name
        exp["custom_experiment"] = ""
        # split exp_folder_name by @ to get extra information
        exp["custom_experiment"] = exp["exp_folder_name"].apply(
            lambda x: x.split("@")[0] if "@" in x else ""
        )
        exp.drop("exp_folder_name", axis=1, inplace=True)
        exp = self.preprocess_results(exp)

        grouped_results_for_y_true_and_pred = exp.groupby(
            [
                "embedding_model",
                "llm_model",
                "query",
                "llm_before_rag",
                "custom_experiment",
            ]
        ).agg({"y_true": ",".join, "y_pred": ",".join})

        grouped_results_for_y_true_and_pred = self.add_metrics(
            grouped_results_for_y_true_and_pred
        )

        # aggregate by computing the average of the metrics for each group
        grouped_results_for_y_true_and_pred = (
            grouped_results_for_y_true_and_pred.groupby(
                [
                    "embedding_model",
                    "llm_model",
                    "llm_before_rag",
                    "custom_experiment",
                ]
            ).agg({metric: "mean" for metric in self.metrics})
        )

        # merge with the results
        merged_df = pd.concat([merged_df, grouped_results_for_y_true_and_pred])

        # sort by metric
        if self.sort_by in self.metrics:
            merged_df = merged_df.sort_values(by=self.sort_by, ascending=False)
    return merged_df

load_queries_from_csv()

Description: Load the queries from the csv file

Source code in evaluation/evaluation_utils.py
108
109
110
111
112
113
114
115
def load_queries_from_csv(self):
    """
    Description: Load the queries from the csv file

    """
    return pd.read_csv(self.eval_path / "merged_labels.csv")[
        ["Topics", "Dataset IDs"]
    ]

load_query_templates()

Description: Load the query templates from the txt file. This is used to generate the queries for the evaluation process. eg: {query_template} {query} {find me a dataset about} {cancer}

Source code in evaluation/evaluation_utils.py
117
118
119
120
121
122
123
124
def load_query_templates(self):
    """
    Description: Load the query templates from the txt file. This is used to generate the queries for the evaluation process. eg: {query_template} {query}
    {find me a dataset about} {cancer}
    """
    with open(self.eval_path / "query_templates.txt", "r") as f:
        query_templates = f.readlines()
    return [x.strip() for x in query_templates]

load_result_files()

Description: Find all the csv files in the evaluation directory.

Source code in evaluation/evaluation_utils.py
40
41
42
43
44
45
def load_result_files(self):
    """
    Description: Find all the csv files in the evaluation directory.

    """
    return glob.glob(str(self.eval_path / "*/*/results.csv"))

preprocess_results(results_df)

Description: Preprocess the results dataframe by filling missing values and converting the columns to the correct data types.

Source code in evaluation/evaluation_utils.py
138
139
140
141
142
143
144
145
146
147
148
def preprocess_results(self, results_df):
    """
    Description: Preprocess the results dataframe by filling missing values and converting the columns to the correct data types.
    """
    results_df["llm_before_rag"] = results_df["llm_before_rag"].fillna(
        "No LLM filtering"
    )
    results_df["y_pred"] = results_df["y_pred"].astype(str)
    results_df["query"] = results_df["query"].str.strip()
    results_df["y_true"] = results_df["query"].map(self.query_key_dict)
    return results_df

run()

Description: Load files, Run the evaluation process and display the results

Source code in evaluation/evaluation_utils.py
30
31
32
33
34
35
36
37
38
def run(self):
    """
    Description: Load files, Run the evaluation process and display the results

    """
    csv_files = self.load_result_files()
    results_df = self.generate_results(csv_files)
    results_display = self.display_results(results_df)
    return results_display

Elastic Search Eval