Spaces:
Running
Running
import streamlit as st | |
from urllib.parse import unquote | |
import os | |
import json | |
from utils.utils_trad import get_total_audio_duration_by_user, list_audio_files_by_title, get_processed_audio_files_by_user_and_title, get_audio_url, save_annotation | |
from dotenv import load_dotenv | |
load_dotenv(".env") | |
S3_BUCKET = os.getenv("S3_BUCKET") | |
S3_PREFIX = os.getenv("S3_PREFIX") | |
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") | |
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") | |
ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL_S3") | |
ANNOTATIONS_PREFIX = "annotations" | |
import s3fs | |
fs = s3fs.S3FileSystem( | |
key=AWS_ACCESS_KEY_ID, | |
secret=AWS_SECRET_ACCESS_KEY, | |
client_kwargs={"endpoint_url": ENDPOINT_URL} | |
) | |
if not all([S3_BUCKET, S3_PREFIX, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, ENDPOINT_URL]): | |
st.error("Veuillez configurer correctement les variables d'environnement S3.") | |
st.stop() | |
def get_completed_titles(): | |
"""Renvoie la liste des titres qui n'ont plus d'audios à traiter.""" | |
status_file = f"{S3_BUCKET}/title_completion_status.json" | |
try: | |
with fs.open(status_file, 'r') as f: | |
status = json.load(f) | |
return [title for title, is_completed in status.items() if is_completed] | |
except (FileNotFoundError, json.JSONDecodeError): | |
return [] | |
def save_title_completion_status(title, is_completed): | |
"""Sauvegarde l'état de traitement d'un titre dans un fichier JSON.""" | |
status_file = f"{S3_BUCKET}/title_completion_status.json" | |
try: | |
if fs.exists(status_file): | |
with fs.open(status_file, 'r') as f: | |
status = json.load(f) | |
else: | |
status = {} | |
except Exception as e: | |
st.warning(f"Erreur lors de la lecture du statut: {e}") | |
status = {} | |
status[title] = is_completed | |
print(f"Mise à jour du statut pour {title}: {is_completed}") | |
print(f"Statut complet: {status}") | |
try: | |
with fs.open(status_file, 'w') as f: | |
json.dump(status, f) | |
print(f"Statut sauvegardé avec succès dans {status_file}") | |
except Exception as e: | |
st.error(f"Erreur lors de la sauvegarde du statut: {e}") | |
st.set_page_config(page_title="Travaux Audio", layout="wide") | |
st.title("🗣️ Travaux Audio - Transcription & Traduction") | |
st.markdown(""" | |
Bienvenue sur la page des **Travaux Audio** du projet **MooreFrCollection**. | |
> 📝 Votre mission : écouter les audios mooré, écrire leur **transcription** (en mooré) et leur **traduction** (en français). | |
""") | |
if "user_logged_in" not in st.session_state: | |
st.session_state.user_logged_in = False | |
if "current_username" not in st.session_state: | |
st.session_state.current_username = "" | |
if "completed_titles" not in st.session_state: | |
st.session_state.completed_titles = set() | |
if not st.session_state.user_logged_in: | |
with st.form("login_form"): | |
input_username = st.text_input("Entrez votre nom ou pseudo pour contribuer :", key="input_username") | |
submit_button = st.form_submit_button("✅ Commencer à contribuer") | |
if submit_button: | |
if not input_username: | |
st.error("Merci d'entrer un nom avant de continuer.") | |
else: | |
st.session_state.user_logged_in = True | |
st.session_state.current_username = input_username | |
st.rerun() | |
st.stop() | |
username = st.session_state.current_username | |
st.success(f"👤 Connecté en tant que: **{username}**") | |
user_duration_minutes = get_total_audio_duration_by_user(username) | |
st.info(f"🎯 Vous avez déjà traité environ **{user_duration_minutes:.1f} minutes** d'audio.") | |
if st.button("👋 Changer d'utilisateur"): | |
st.session_state.user_logged_in = False | |
st.session_state.current_username = "" | |
st.rerun() | |
if "audio_titles" not in st.session_state: | |
st.session_state.audio_titles = list_audio_files_by_title() | |
audio_titles = st.session_state.audio_titles | |
if not audio_titles: | |
st.warning("Aucun audio disponible pour l'instant.") | |
st.stop() | |
globally_completed_titles = get_completed_titles() | |
print(f"Titres globalement terminés: {globally_completed_titles}") | |
available_titles = [title for title in audio_titles.keys() | |
if title not in st.session_state.completed_titles | |
and title not in globally_completed_titles] | |
if not available_titles: | |
st.success("🎉 Félicitations ! Tous les groupes d'audio disponibles sont terminés.") | |
st.stop() | |
# Sélection du titre audio | |
default_index = 0 | |
if "selected_title" in st.session_state and st.session_state["selected_title"] in available_titles: | |
default_index = available_titles.index(st.session_state["selected_title"]) | |
selected_title = st.selectbox( | |
"Choisissez un groupe audio :", | |
available_titles, | |
key="audio_group", | |
index=default_index | |
) | |
st.session_state["selected_title"] = selected_title | |
audio_paths = audio_titles[selected_title] | |
processed_files = get_processed_audio_files_by_user_and_title(username, selected_title) | |
print(f"Fichiers déjà traités pour {username} et {selected_title}: {processed_files}") | |
unprocessed_audio_paths = [path for path in audio_paths if os.path.basename(path) not in processed_files] | |
print(f"Fichiers non traités: {len(unprocessed_audio_paths)} sur {len(audio_paths)}") | |
if not unprocessed_audio_paths: | |
st.success(f"🎉 Vous avez déjà terminé tous les audios du groupe '{selected_title}'!") | |
st.session_state.completed_titles.add(selected_title) | |
all_files_processed = True | |
for audio_path in audio_paths: | |
audio_filename = os.path.basename(audio_path) | |
annotation_path = f"{S3_BUCKET}/{ANNOTATIONS_PREFIX}/{selected_title}/{audio_filename}.json" | |
if not fs.exists(annotation_path): | |
all_files_processed = False | |
print(f"Fichier non annoté: {annotation_path}") | |
break | |
if all_files_processed: | |
print(f"Tous les fichiers du titre {selected_title} sont annotés") | |
save_title_completion_status(selected_title, True) | |
if st.button("Continuer avec un autre groupe (Terminé)"): | |
st.rerun() | |
st.stop() | |
index_key = f"index_{selected_title}" | |
if index_key not in st.session_state: | |
st.session_state[index_key] = 0 | |
else: | |
st.session_state[index_key] = min(st.session_state[index_key], len(unprocessed_audio_paths) - 1) | |
current_index = st.session_state[index_key] | |
if unprocessed_audio_paths: | |
current_audio = unprocessed_audio_paths[current_index] | |
st.subheader(f"🎧 Audio {current_index + 1} sur {len(unprocessed_audio_paths)} : {os.path.basename(current_audio)}") | |
audio_url = get_audio_url(current_audio) | |
st.audio(audio_url) | |
with st.form(f"form_{current_audio}"): | |
transcription = st.text_area("Transcription en mooré", key=f"tr_{current_audio}") | |
traduction = st.text_area("Traduction en français", key=f"trad_{current_audio}") | |
submitted = st.form_submit_button("💾 Soumettre") | |
if submitted: | |
try: | |
save_result = save_annotation( | |
audio_path=current_audio, | |
user=username, | |
transcription=transcription, | |
traduction=traduction, | |
) | |
st.success("✅ Contribution enregistrée avec succès !") | |
print(f"Résultat de sauvegarde: {save_result}") | |
st.session_state[index_key] += 1 | |
if st.session_state[index_key] >= len(unprocessed_audio_paths): | |
st.success(f"🎉 Vous avez terminé tous les audios du groupe '{selected_title}'!") | |
st.session_state.completed_titles.add(selected_title) | |
save_title_completion_status(selected_title, True) | |
else: | |
st.rerun() | |
except Exception as e: | |
st.error(f"Erreur lors de l'enregistrement: {e}") | |
if st.session_state[index_key] >= len(unprocessed_audio_paths) and st.button("Continuer avec un autre groupe"): | |
st.rerun() | |
else: | |
st.info(f"Il ne reste plus d'audios à traiter pour le groupe '{selected_title}'.") | |
if st.button("Choisir un autre groupe"): | |
st.rerun() |