Spaces:
Running
Running
import boto3 | |
import json | |
import os | |
from dotenv import load_dotenv | |
import pandas as pd | |
from io import BytesIO | |
import soundfile as sf | |
from datetime import datetime | |
from dotenv import load_dotenv | |
load_dotenv(".env") | |
S3_BUCKET = os.getenv("S3_BUCKET") | |
S3_PREFIX = os.getenv("S3_PREFIX") | |
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") | |
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") | |
ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL_S3") | |
ANNOTATIONS_PREFIX = "annotations" | |
s3 = boto3.client( | |
"s3", | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
endpoint_url=ENDPOINT_URL | |
) | |
def list_audio_files_by_title(): | |
"""Regroupe les fichiers audio par titre (préfixe de dossier).""" | |
response = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_PREFIX) | |
if "Contents" not in response: | |
return {} | |
grouped = {} | |
for obj in response["Contents"]: | |
key = obj["Key"] | |
if not key.endswith(".wav"): | |
continue | |
parts = key.split("/") | |
if len(parts) >= 3: | |
title = parts[1] | |
grouped.setdefault(title, []).append(key) | |
return grouped | |
def get_audio_url(audio_path): | |
"""Génère une URL temporaire pour écouter l'audio.""" | |
return s3.generate_presigned_url( | |
ClientMethod="get_object", | |
Params={"Bucket": S3_BUCKET, "Key": audio_path}, | |
ExpiresIn=3600, | |
) | |
def get_audio_duration_from_s3(bucket, key): | |
"""Récupère la durée d'un fichier audio depuis S3.""" | |
try: | |
obj = s3.get_object(Bucket=bucket, Key=key) | |
audio_bytes = obj['Body'].read() | |
with BytesIO(audio_bytes) as audio_buffer: | |
y, sr = sf.read(audio_buffer) | |
duration = len(y) / sr | |
return duration | |
except Exception as e: | |
print(f"Erreur lors de la lecture de la durée de {key}: {e}") | |
return 0.0 | |
def save_annotation(audio_path, user, transcription, traduction): | |
"""Sauvegarde l'annotation de l'utilisateur dans S3.""" | |
duration = get_audio_duration_from_s3(S3_BUCKET, audio_path) | |
base_filename = os.path.basename(audio_path).replace(".wav", "") | |
path_parts = audio_path.split('/') | |
title = path_parts[-2] | |
annotation_key = f"{ANNOTATIONS_PREFIX}/{title}/{base_filename}__{user}.json" | |
payload = { | |
"audio_path": audio_path, | |
"user": user, | |
"transcription": transcription, | |
"traduction": traduction, | |
"duration": duration, | |
"created_at": datetime.utcnow().isoformat() # Ajouter un timestamp UTC | |
} | |
s3.put_object( | |
Bucket=S3_BUCKET, | |
Key=annotation_key, | |
Body=json.dumps(payload, ensure_ascii=False).encode("utf-8"), | |
ContentType="application/json", | |
) | |
def get_total_audio_duration_by_user(username: str) -> float: | |
"""Calcule la durée totale (en minutes) d'audios annotés par un utilisateur.""" | |
paginator = s3.get_paginator("list_objects_v2") | |
total_seconds = 0.0 | |
for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=ANNOTATIONS_PREFIX): | |
for obj in page.get("Contents", []): | |
key = obj["Key"] | |
if not key.endswith(".json") or f"__{username}.json" not in key: | |
continue | |
try: | |
file_obj = s3.get_object(Bucket=S3_BUCKET, Key=key) | |
content = file_obj["Body"].read().decode('utf-8') | |
data = json.loads(content) | |
duration = data.get("duration") | |
if duration: | |
total_seconds += float(duration) | |
except Exception as e: | |
print(f"Erreur lors de la lecture de {key}: {e}") | |
continue | |
return total_seconds / 60.0 | |
def get_processed_audio_files_by_user_and_title(username: str, title: str) -> set: | |
"""Récupère l'ensemble des noms de fichiers audio déjà traités par un utilisateur pour un titre donné.""" | |
processed_files = set() | |
prefix = f"{ANNOTATIONS_PREFIX}/{title}/" | |
paginator = s3.get_paginator("list_objects_v2") | |
for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=prefix): | |
for obj in page.get("Contents", []): | |
key = obj["Key"] | |
if key.endswith(f"__{username}.json"): | |
filename_with_ext = key.split("/")[-1].replace(f"__{username}.json", ".wav") | |
processed_files.add(filename_with_ext) | |
return processed_files | |