FBDF / app.py
Firas HADJ KACEM
made a small comment
8e83be6
import gradio as gr
import pandas as pd
import numpy as np
import torch
# from transformers import AutoTokenizer, AutoModelForSequenceClassification
# import json
from scipy.spatial.distance import jensenshannon, cosine
# import shap
import os
from backend.model_manager import ModelManager
from backend.data_manager import DataManager
from backend.helpers import jensen_shannon_distance
model_manager = ModelManager()
data_manager = DataManager()
def load_datasets():
"""Load sample datasets with hardcoded examples"""
return True
def load_model(model_name):
"""Load model and tokenizer"""
try:
wrapped_model, tokenizer = model_manager.load_model(model_name)
return wrapped_model, tokenizer
except Exception as e:
print(f"Error loading model {model_name}: {e}")
return None, None
def get_sentiment_prediction(text, model, tokenizer):
"""Get sentiment prediction from model"""
if model is None:
# Fallback to dummy predictions for demo
return {
"label": "NM",
"probabilities": {"Negative": 0.01, "Neutral": 0.01, "Positive": 0.01}
}
try:
# Build full prompt for analysis
prefix = "Analyze the sentiment of this statement extracted from a financial news article. Provide your answer as either negative, positive, or neutral.. Text: "
suffix = ".. Answer: "
full_prompt = f"{prefix}{text}{suffix}"
# Added a small comment here.
result = model.generate(prompt=full_prompt)
return result
except Exception as e:
print(f"Error in prediction: {e}")
return {"label": "NA", "probabilities": {"Negative": 0.0, "Neutral": 0.0, "Positive": 0.0}}
def calculate_distances(orig_probs, mut_probs):
"""Calculate Jensen-Shannon distance and Cosine similarity"""
try:
js_distance = jensen_shannon_distance(orig_probs, mut_probs)
# Convert to arrays for cosine similarity
orig_array = np.array(list(orig_probs.values()))
mut_array = np.array(list(mut_probs.values()))
cos_sim = 1 - cosine(orig_array, mut_array)
return js_distance, cos_sim
except Exception as e:
print(f"Error calculating distances: {e}")
return 0.0, 1.0
def load_bias_dictionary():
"""Load bias terms from the bias dictionary files"""
bias_terms = set()
bias_dir = "data/bias"
try:
for category in ["gender", "age", "race"]:
file_path = os.path.join(bias_dir, category, f"{category}_terms.csv")
if os.path.exists(file_path):
df = pd.read_csv(file_path)
# Assuming the CSV has a column with bias terms
if 'term' in df.columns:
bias_terms.update(df['term'].str.lower().tolist())
elif len(df.columns) > 0:
# Use first column if 'term' column doesn't exist
bias_terms.update(df.iloc[:, 0].str.lower().tolist())
except Exception as e:
print(f"[v0] Error loading bias dictionary: {e}")
# Add some common bias terms as fallback
bias_terms.update(['people', 'person', 'man', 'woman', 'male', 'female', 'young', 'old', 'white', 'black', 'asian', 'hispanic', 'russian', 'american', 'european'])
return bias_terms
def find_bias_tokens_in_sentence(sentence, bias_dictionary):
"""Find bias tokens present in a sentence"""
words = sentence.lower().split()
bias_tokens_found = {}
for i, word in enumerate(words):
# Clean word of punctuation
clean_word = word.strip('.,!?;:"()[]{}')
if clean_word in bias_dictionary:
bias_tokens_found[clean_word] = {
'position': i,
'original_word': word
}
return bias_tokens_found
def calculate_shapley_values(original_text, atomic1_text, atomic2_text, intersectional_text, model_name):
"""Calculate SHAP values for bias tokens using BiasAnalyzer and show rank changes"""
try:
print(f"[v0] Starting SHAP calculation for model: {model_name}")
analyzer = model_manager.get_bias_analyzer(model_name)
print(f"[v0] BiasAnalyzer created successfully")
sentences = {
'original': original_text,
'atomic1': atomic1_text,
'atomic2': atomic2_text,
'intersectional': intersectional_text
}
sentence_results = {}
for sentence_type, sentence_text in sentences.items():
try:
print(f"[v0] Analyzing {sentence_type}: {sentence_text}")
result = analyzer.analyze_sentence(
sentence_text,
sampling_ratio=0.1,
max_combinations=50
)
sentence_results[sentence_type] = result
print(f"[v0] {sentence_type} analysis completed")
except Exception as e:
print(f"[v0] Error analyzing {sentence_type}: {e}")
sentence_results[sentence_type] = {'Bias Token Ranks': {}}
print(f"[v0] SHAP analysis completed successfully")
return {
"sentence_results": sentence_results
}
except Exception as e:
print(f"[v0] Error calculating SHAP: {e}")
import traceback
print(f"[v0] Full traceback: {traceback.format_exc()}")
return {
"error": str(e)
}
def run_bias_detection(dataset_name, sentence_display, model_name, show_distances, show_shapley):
"""Main function to run bias detection analysis"""
try:
sentences = data_manager.get_dataset_sentences(dataset_name)
sentence_index = sentences.index(sentence_display)
sentence_data = data_manager.get_sentence_data(dataset_name, sentence_index)
# Get the actual sentence variations from the data
original_sentence = sentence_data["original"]
atomic1_sentence = sentence_data["mutant_1"] # Changed from "atomic_1" to "mutant_1"
atomic2_sentence = sentence_data["mutant_2"] # Changed from "atomic_2" to "mutant_2"
intersectional_sentence = sentence_data["intersectional"]
except Exception as e:
print(f"[v0] Error parsing sentence selection: {e}")
return f"Error: Could not parse sentence selection - {str(e)}"
# Load model
model, tokenizer = load_model(model_name)
mutations = {
"original": original_sentence,
"atomic_1": atomic1_sentence,
"atomic_2": atomic2_sentence,
"intersectional": intersectional_sentence
}
# Get predictions for all variations
orig_pred = get_sentiment_prediction(mutations["original"], model, tokenizer)
atomic1_pred = get_sentiment_prediction(mutations["atomic_1"], model, tokenizer)
atomic2_pred = get_sentiment_prediction(mutations["atomic_2"], model, tokenizer)
intersectional_pred = get_sentiment_prediction(mutations["intersectional"], model, tokenizer)
atomic1_bias = orig_pred["label"] != atomic1_pred["label"]
atomic2_bias = orig_pred["label"] != atomic2_pred["label"]
intersectional_bias = orig_pred["label"] != intersectional_pred["label"]
bias_detected = atomic1_bias or atomic2_bias or intersectional_bias
results = f"""# πŸ”¬ Bias Detection Analysis
**Model:** {model_name} | **Dataset:** {dataset_name}
---
## πŸ“Š Sentence Variations
### πŸ”Έ Original Sentence
> {mutations["original"]}
**Prediction:** `{orig_pred["label"].upper()}` | **Probabilities:** {format_probabilities(orig_pred["probabilities"])}
### πŸ”Έ Atomic Mutation 1
> {mutations["atomic_1"]}
**Prediction:** `{atomic1_pred["label"].upper()}` | **Probabilities:** {format_probabilities(atomic1_pred["probabilities"])}
### πŸ”Έ Atomic Mutation 2
> {mutations["atomic_2"]}
**Prediction:** `{atomic2_pred["label"].upper()}` | **Probabilities:** {format_probabilities(atomic2_pred["probabilities"])}
### πŸ”Έ Intersectional Mutation
> {mutations["intersectional"]}
**Prediction:** `{intersectional_pred["label"].upper()}` | **Probabilities:** {format_probabilities(intersectional_pred["probabilities"])}
---
## 🎯 Bias Detection Results
### {"⚠️ BIAS DETECTED" if bias_detected else "βœ… NO BIAS DETECTED"}
**πŸ” Atomic Bias 1:** {"🚨 DETECTED" if atomic1_bias else "βœ… NOT DETECTED"}
*Original: {orig_pred["label"]} β†’ Mutated: {atomic1_pred["label"]}*
**πŸ” Atomic Bias 2:** {"🚨 DETECTED" if atomic2_bias else "βœ… NOT DETECTED"}
*Original: {orig_pred["label"]} β†’ Mutated: {atomic2_pred["label"]}*
**πŸ” Intersectional Bias:** {"🚨 DETECTED" if intersectional_bias else "βœ… NOT DETECTED"}
*Original: {orig_pred["label"]} β†’ Mutated: {intersectional_pred["label"]}*
"""
if show_distances:
js1, cos1 = calculate_distances(orig_pred["probabilities"], atomic1_pred["probabilities"])
js2, cos2 = calculate_distances(orig_pred["probabilities"], atomic2_pred["probabilities"])
js3, cos3 = calculate_distances(orig_pred["probabilities"], intersectional_pred["probabilities"])
results += f"""---
## πŸ“ Distance Metrics Analysis
### πŸ”Έ Atomic Mutation 1
**Jensen-Shannon Distance:** `{js1:.6f}` | **Cosine Similarity:** `{cos1:.6f}`
### πŸ”Έ Atomic Mutation 2
**Jensen-Shannon Distance:** `{js2:.6f}` | **Cosine Similarity:** `{cos2:.6f}`
### πŸ”Έ Intersectional Mutation
**Jensen-Shannon Distance:** `{js3:.6f}` | **Cosine Similarity:** `{cos3:.6f}`
"""
if show_shapley:
try:
shap_data = calculate_shapley_values(
mutations["original"],
mutations["atomic_1"],
mutations["atomic_2"],
mutations["intersectional"],
model_name
)
if "error" in shap_data:
results += f"""---
## 🎯 SHAP Values Analysis
*SHAP calculation failed: {shap_data["error"]}*
*This feature requires significant computational resources.*
"""
else:
results += f"""---
## 🎯 SHAP Values Analysis - Bias Tokens Only
"""
def format_bias_tokens_from_analyzer(sentence_results, sentence_type, title):
result = f"### πŸ”Έ {title}\n\n"
# Get bias token ranks from BiasAnalyzer results
bias_token_ranks = sentence_results.get(sentence_type, {}).get('Bias Token Ranks', {})
if not bias_token_ranks:
return result + "*No bias tokens detected*\n\n"
for token, token_data in bias_token_ranks.items():
shap_val = token_data.get('shapley_value', 0.0)
rank = token_data.get('rank', 'N/A')
percentile = token_data.get('percentile', 'N/A')
token_type = token_data.get('type', 'single_word')
importance_level = "πŸ”΄ HIGH" if abs(shap_val) > 0.1 else "🟑 MED" if abs(shap_val) > 0.05 else "🟒 LOW"
result += f"**{token}** | `{shap_val:.3f}` | {importance_level} | *rank: {rank} ({percentile}%) | type: {token_type}*\n\n"
return result
sentence_results = shap_data.get("sentence_results", {})
results += format_bias_tokens_from_analyzer(sentence_results, 'original', "Original Sentence Bias Tokens")
results += format_bias_tokens_from_analyzer(sentence_results, 'atomic1', "Atomic Mutation 1 Bias Tokens")
results += format_bias_tokens_from_analyzer(sentence_results, 'atomic2', "Atomic Mutation 2 Bias Tokens")
results += format_bias_tokens_from_analyzer(sentence_results, 'intersectional', "Intersectional Mutation Bias Tokens")
results += "### πŸ”Έ Bias Token Rank Changes by Mutation Words\n\n"
# Get mutation word information from sentence data
word1 = sentence_data.get("word_1", "Word 1")
replacement1 = sentence_data.get("replacement_1", "Replacement 1")
word2 = sentence_data.get("word_2", "Word 2")
replacement2 = sentence_data.get("replacement_2", "Replacement 2")
original_ranks = sentence_results.get('original', {}).get('Bias Token Ranks', {})
atomic1_ranks = sentence_results.get('atomic1', {}).get('Bias Token Ranks', {})
atomic2_ranks = sentence_results.get('atomic2', {}).get('Bias Token Ranks', {})
intersectional_ranks = sentence_results.get('intersectional', {}).get('Bias Token Ranks', {})
# Track rank changes for mutation words
mutation_changes_found = False
# Check Word 1 -> Replacement 1 (Atomic Mutation 1)
results += f"**Word 1 ({word1} β†’ {replacement1}):**\n\n"
replacement1_lower = replacement1.lower()
word1_lower = word1.lower()
# Check if replacement word appears in atomic1 mutation
replacement1_found = False
for token, token_data in atomic1_ranks.items():
if token.lower() == replacement1_lower:
atomic1_rank = token_data['rank']
# Check if original word was in original sentence
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word1_lower:
orig_rank = orig_data['rank']
rank_diff = atomic1_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{replacement1}**: {orig_rank} β†’ {atomic1_rank} {change_indicator}\n\n"
mutation_changes_found = True
original_word_found = True
replacement1_found = True
break
if not original_word_found:
results += f"- **{replacement1}**: New bias token (rank: {atomic1_rank})\n\n"
mutation_changes_found = True
replacement1_found = True
break
if not replacement1_found:
# Check if replacement word might be detected under different tokenization
for token, token_data in atomic1_ranks.items():
if replacement1_lower in token.lower() or token.lower() in replacement1_lower:
atomic1_rank = token_data['rank']
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word1_lower:
orig_rank = orig_data['rank']
rank_diff = atomic1_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{token}** (from {replacement1}): {orig_rank} β†’ {atomic1_rank} {change_indicator}\n\n"
mutation_changes_found = True
original_word_found = True
replacement1_found = True
break
if not original_word_found:
results += f"- **{token}** (from {replacement1}): New bias token (rank: {atomic1_rank})\n\n"
mutation_changes_found = True
replacement1_found = True
break
if not replacement1_found:
results += f"- **{replacement1}**: Not detected as bias token\n\n"
# Check Word 2 -> Replacement 2 (Atomic Mutation 2)
results += f"**Word 2 ({word2} β†’ {replacement2}):**\n\n"
replacement2_lower = replacement2.lower()
word2_lower = word2.lower()
replacement2_found = False
for token, token_data in atomic2_ranks.items():
if token.lower() == replacement2_lower:
atomic2_rank = token_data['rank']
# Check if original word was in original sentence
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word2_lower:
orig_rank = orig_data['rank']
rank_diff = atomic2_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{replacement2}**: {orig_rank} β†’ {atomic2_rank} {change_indicator}\n\n"
mutation_changes_found = True
original_word_found = True
replacement2_found = True
break
if not original_word_found:
results += f"- **{replacement2}**: New bias token (rank: {atomic2_rank})\n\n"
mutation_changes_found = True
replacement2_found = True
break
if not replacement2_found:
# Check if replacement word might be detected under different tokenization
for token, token_data in atomic2_ranks.items():
if replacement2_lower in token.lower() or token.lower() in replacement2_lower:
atomic2_rank = token_data['rank']
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word2_lower:
orig_rank = orig_data['rank']
rank_diff = atomic2_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{token}** (from {replacement2}): {orig_rank} β†’ {atomic2_rank} {change_indicator} (Ξ”{rank_diff:+d})\n\n"
mutation_changes_found = True
original_word_found = True
replacement2_found = True
break
if not original_word_found:
results += f"- **{token}** (from {replacement2}): New bias token (rank: {atomic2_rank})\n\n"
mutation_changes_found = True
replacement2_found = True
break
if not replacement2_found:
results += f"- **{replacement2}**: Not detected as bias token\n\n"
# Check Intersectional changes
results += f"**Intersectional Mutation ({word1}β†’{replacement1} + {word2}β†’{replacement2}):**\n\n"
intersectional_changes_found = False
replacement1_intersectional_found = False
for token, token_data in intersectional_ranks.items():
if token.lower() == replacement1_lower:
intersectional_rank = token_data['rank']
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word1_lower:
orig_rank = orig_data['rank']
rank_diff = intersectional_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{replacement1}**: {orig_rank} β†’ {intersectional_rank} {change_indicator} (Ξ”{rank_diff:+d})\n"
intersectional_changes_found = True
original_word_found = True
replacement1_intersectional_found = True
break
if not original_word_found:
results += f"- **{replacement1}**: New bias token (rank: {intersectional_rank})\n"
intersectional_changes_found = True
replacement1_intersectional_found = True
break
if not replacement1_intersectional_found:
# Check partial matches for replacement 1
for token, token_data in intersectional_ranks.items():
if replacement1_lower in token.lower() or token.lower() in replacement1_lower:
intersectional_rank = token_data['rank']
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word1_lower:
orig_rank = orig_data['rank']
rank_diff = intersectional_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{token}** (from {replacement1}): {orig_rank} β†’ {intersectional_rank} {change_indicator} (Ξ”{rank_diff:+d})\n"
intersectional_changes_found = True
original_word_found = True
replacement1_intersectional_found = True
break
if not original_word_found:
results += f"- **{token}** (from {replacement1}): New bias token (rank: {intersectional_rank})\n"
intersectional_changes_found = True
replacement1_intersectional_found = True
break
replacement2_intersectional_found = False
for token, token_data in intersectional_ranks.items():
if token.lower() == replacement2_lower:
intersectional_rank = token_data['rank']
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word2_lower:
orig_rank = orig_data['rank']
rank_diff = intersectional_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{replacement2}**: {orig_rank} β†’ {intersectional_rank} {change_indicator} (Ξ”{rank_diff:+d})\n"
intersectional_changes_found = True
original_word_found = True
replacement2_intersectional_found = True
break
if not original_word_found:
results += f"- **{replacement2}**: New bias token (rank: {intersectional_rank})\n"
intersectional_changes_found = True
replacement2_intersectional_found = True
break
if not replacement2_intersectional_found:
# Check partial matches for replacement 2
for token, token_data in intersectional_ranks.items():
if replacement2_lower in token.lower() or token.lower() in replacement2_lower:
intersectional_rank = token_data['rank']
original_word_found = False
for orig_token, orig_data in original_ranks.items():
if orig_token.lower() == word2_lower:
orig_rank = orig_data['rank']
rank_diff = intersectional_rank - orig_rank
change_indicator = "πŸ“ˆ" if rank_diff < 0 else "πŸ“‰" if rank_diff > 0 else "➑️"
results += f"- **{token}** (from {replacement2}): {orig_rank} β†’ {intersectional_rank} {change_indicator} (Ξ”{rank_diff:+d})\n"
intersectional_changes_found = True
original_word_found = True
replacement2_intersectional_found = True
break
if not original_word_found:
results += f"- **{token}** (from {replacement2}): New bias token (rank: {intersectional_rank})\n"
intersectional_changes_found = True
replacement2_intersectional_found = True
break
if not intersectional_changes_found:
results += "*No bias tokens detected for intersectional mutation words*\n"
if not mutation_changes_found and not intersectional_changes_found:
results += "*No bias tokens detected for mutation words*\n"
except Exception as e:
results += f"""---
## 🎯 SHAP Values Analysis
*SHAP calculation failed: {str(e)}*
*This feature requires significant computational resources.*
"""
return results
def format_probabilities(probs_dict):
"""Format probability dictionary for display"""
return " | ".join([f"{k}: {v:.6f}" for k, v in probs_dict.items()])
def update_sentences(dataset_name):
"""Update sentence dropdown based on selected dataset"""
try:
sentences = data_manager.get_dataset_sentences(dataset_name)
return gr.Dropdown(choices=sentences, value=sentences[0] if sentences else None)
except Exception as e:
print(f"[v0] Error updating sentences: {e}")
return gr.Dropdown(choices=[], value=None)
# Initialize datasets
load_datasets()
# Create Gradio interface
with gr.Blocks(title="Bias Detection Framework", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ”¬ Financial Bias Detection Framework")
gr.Markdown("Demo interface for detecting bias in financial sentiment analysis models")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## βš™οΈ Configuration")
dataset_dropdown = gr.Dropdown(
choices=["FPB", "FinSen"],
label="πŸ“Š Select Dataset",
value="FPB"
)
sentence_dropdown = gr.Dropdown(
choices=[],
label="πŸ“ Select Sentence",
interactive=True
)
model_dropdown = gr.Dropdown(
choices=list(model_manager.model_configs.keys()),
label="πŸ€– Select Model",
value="FinBERT"
)
show_distances = gr.Checkbox(
label="πŸ“ Show Original to Mutated Distances",
value=False
)
show_shapley = gr.Checkbox(
label="🎯 Show SHAP Values",
value=False
)
analyze_btn = gr.Button("πŸš€ Run Bias Analysis", variant="primary")
with gr.Column(scale=2):
gr.Markdown("## πŸ“‹ Results")
results_output = gr.Markdown("")
# Event handlers
dataset_dropdown.change(
fn=update_sentences,
inputs=[dataset_dropdown],
outputs=[sentence_dropdown]
)
analyze_btn.click(
fn=run_bias_detection,
inputs=[dataset_dropdown, sentence_dropdown, model_dropdown, show_distances, show_shapley],
outputs=[results_output]
)
# Initialize sentence dropdown
demo.load(
fn=update_sentences,
inputs=[dataset_dropdown],
outputs=[sentence_dropdown]
)
if __name__ == "__main__":
demo.launch()