Transliterate / pages /1_🎧_Transcriptions.py
sawadogosalif's picture
persist finhsi job
bccfcc4
import streamlit as st
from urllib.parse import unquote
import os
import json
from utils.utils_trad import get_total_audio_duration_by_user, list_audio_files_by_title, get_processed_audio_files_by_user_and_title, get_audio_url, save_annotation
from dotenv import load_dotenv
load_dotenv(".env")
S3_BUCKET = os.getenv("S3_BUCKET")
S3_PREFIX = os.getenv("S3_PREFIX")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL_S3")
ANNOTATIONS_PREFIX = "annotations"
import s3fs
fs = s3fs.S3FileSystem(
key=AWS_ACCESS_KEY_ID,
secret=AWS_SECRET_ACCESS_KEY,
client_kwargs={"endpoint_url": ENDPOINT_URL}
)
if not all([S3_BUCKET, S3_PREFIX, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, ENDPOINT_URL]):
st.error("Veuillez configurer correctement les variables d'environnement S3.")
st.stop()
def get_completed_titles():
"""Renvoie la liste des titres qui n'ont plus d'audios à traiter."""
status_file = f"{S3_BUCKET}/title_completion_status.json"
try:
with fs.open(status_file, 'r') as f:
status = json.load(f)
return [title for title, is_completed in status.items() if is_completed]
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_title_completion_status(title, is_completed):
"""Sauvegarde l'état de traitement d'un titre dans un fichier JSON."""
status_file = f"{S3_BUCKET}/title_completion_status.json"
try:
if fs.exists(status_file):
with fs.open(status_file, 'r') as f:
status = json.load(f)
else:
status = {}
except Exception as e:
st.warning(f"Erreur lors de la lecture du statut: {e}")
status = {}
status[title] = is_completed
print(f"Mise à jour du statut pour {title}: {is_completed}")
print(f"Statut complet: {status}")
try:
with fs.open(status_file, 'w') as f:
json.dump(status, f)
print(f"Statut sauvegardé avec succès dans {status_file}")
except Exception as e:
st.error(f"Erreur lors de la sauvegarde du statut: {e}")
st.set_page_config(page_title="Travaux Audio", layout="wide")
st.title("🗣️ Travaux Audio - Transcription & Traduction")
st.markdown("""
Bienvenue sur la page des **Travaux Audio** du projet **MooreFrCollection**.
> 📝 Votre mission : écouter les audios mooré, écrire leur **transcription** (en mooré) et leur **traduction** (en français).
""")
if "user_logged_in" not in st.session_state:
st.session_state.user_logged_in = False
if "current_username" not in st.session_state:
st.session_state.current_username = ""
if "completed_titles" not in st.session_state:
st.session_state.completed_titles = set()
if not st.session_state.user_logged_in:
with st.form("login_form"):
input_username = st.text_input("Entrez votre nom ou pseudo pour contribuer :", key="input_username")
submit_button = st.form_submit_button("✅ Commencer à contribuer")
if submit_button:
if not input_username:
st.error("Merci d'entrer un nom avant de continuer.")
else:
st.session_state.user_logged_in = True
st.session_state.current_username = input_username
st.rerun()
st.stop()
username = st.session_state.current_username
st.success(f"👤 Connecté en tant que: **{username}**")
user_duration_minutes = get_total_audio_duration_by_user(username)
st.info(f"🎯 Vous avez déjà traité environ **{user_duration_minutes:.1f} minutes** d'audio.")
if st.button("👋 Changer d'utilisateur"):
st.session_state.user_logged_in = False
st.session_state.current_username = ""
st.rerun()
if "audio_titles" not in st.session_state:
st.session_state.audio_titles = list_audio_files_by_title()
audio_titles = st.session_state.audio_titles
if not audio_titles:
st.warning("Aucun audio disponible pour l'instant.")
st.stop()
globally_completed_titles = get_completed_titles()
print(f"Titres globalement terminés: {globally_completed_titles}")
available_titles = [title for title in audio_titles.keys()
if title not in st.session_state.completed_titles
and title not in globally_completed_titles]
if not available_titles:
st.success("🎉 Félicitations ! Tous les groupes d'audio disponibles sont terminés.")
st.stop()
# Sélection du titre audio
default_index = 0
if "selected_title" in st.session_state and st.session_state["selected_title"] in available_titles:
default_index = available_titles.index(st.session_state["selected_title"])
selected_title = st.selectbox(
"Choisissez un groupe audio :",
available_titles,
key="audio_group",
index=default_index
)
st.session_state["selected_title"] = selected_title
audio_paths = audio_titles[selected_title]
processed_files = get_processed_audio_files_by_user_and_title(username, selected_title)
print(f"Fichiers déjà traités pour {username} et {selected_title}: {processed_files}")
unprocessed_audio_paths = [path for path in audio_paths if os.path.basename(path) not in processed_files]
print(f"Fichiers non traités: {len(unprocessed_audio_paths)} sur {len(audio_paths)}")
if not unprocessed_audio_paths:
st.success(f"🎉 Vous avez déjà terminé tous les audios du groupe '{selected_title}'!")
st.session_state.completed_titles.add(selected_title)
all_files_processed = True
for audio_path in audio_paths:
audio_filename = os.path.basename(audio_path)
annotation_path = f"{S3_BUCKET}/{ANNOTATIONS_PREFIX}/{selected_title}/{audio_filename}.json"
if not fs.exists(annotation_path):
all_files_processed = False
print(f"Fichier non annoté: {annotation_path}")
break
if all_files_processed:
print(f"Tous les fichiers du titre {selected_title} sont annotés")
save_title_completion_status(selected_title, True)
if st.button("Continuer avec un autre groupe (Terminé)"):
st.rerun()
st.stop()
index_key = f"index_{selected_title}"
if index_key not in st.session_state:
st.session_state[index_key] = 0
else:
st.session_state[index_key] = min(st.session_state[index_key], len(unprocessed_audio_paths) - 1)
current_index = st.session_state[index_key]
if unprocessed_audio_paths:
current_audio = unprocessed_audio_paths[current_index]
st.subheader(f"🎧 Audio {current_index + 1} sur {len(unprocessed_audio_paths)} : {os.path.basename(current_audio)}")
audio_url = get_audio_url(current_audio)
st.audio(audio_url)
with st.form(f"form_{current_audio}"):
transcription = st.text_area("Transcription en mooré", key=f"tr_{current_audio}")
traduction = st.text_area("Traduction en français", key=f"trad_{current_audio}")
submitted = st.form_submit_button("💾 Soumettre")
if submitted:
try:
save_result = save_annotation(
audio_path=current_audio,
user=username,
transcription=transcription,
traduction=traduction,
)
st.success("✅ Contribution enregistrée avec succès !")
print(f"Résultat de sauvegarde: {save_result}")
st.session_state[index_key] += 1
if st.session_state[index_key] >= len(unprocessed_audio_paths):
st.success(f"🎉 Vous avez terminé tous les audios du groupe '{selected_title}'!")
st.session_state.completed_titles.add(selected_title)
save_title_completion_status(selected_title, True)
else:
st.rerun()
except Exception as e:
st.error(f"Erreur lors de l'enregistrement: {e}")
if st.session_state[index_key] >= len(unprocessed_audio_paths) and st.button("Continuer avec un autre groupe"):
st.rerun()
else:
st.info(f"Il ne reste plus d'audios à traiter pour le groupe '{selected_title}'.")
if st.button("Choisir un autre groupe"):
st.rerun()