Spaces:
Running
Running
import os | |
import re | |
import shutil | |
import requests | |
from tqdm import tqdm | |
from datetime import datetime | |
from zoneinfo import ZoneInfo | |
from tzlocal import get_localzone | |
from mutagen.mp3 import MP3 | |
from mutagen.flac import FLAC, Picture | |
from mutagen.id3 import TIT2, TPE1, TALB, USLT, APIC | |
EN_US = os.getenv("LANG") != "zh_CN.UTF-8" | |
API_163 = os.getenv("api_music163") | |
API_KUWO = os.getenv("api_kuwo") | |
API_QQ_2 = os.getenv("api_qmusic_2") | |
API_QQ_1 = os.getenv("api_qmusic_1") | |
KEY_QQ_1 = os.getenv("apikey_qmusic_1") | |
if not (API_163 and API_KUWO and API_QQ_2 and API_QQ_1 and KEY_QQ_1): | |
print("请检查环境变量!") | |
exit() | |
TIMEOUT = None | |
TMP_DIR = "./__pycache__" | |
HEADER = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36" | |
} | |
def timestamp(naive_time: datetime = None, target_tz=ZoneInfo("Asia/Shanghai")): | |
if not naive_time: | |
naive_time = datetime.now() | |
local_tz = get_localzone() | |
aware_local = naive_time.replace(tzinfo=local_tz) | |
return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S") | |
def insert_meta(audio_in: str, title, artist, album, lyric, cover: str, audio_out=""): | |
if audio_out: | |
shutil.copyfile(audio_in, audio_out) | |
else: | |
audio_out = audio_in | |
if cover: | |
if cover.startswith("http"): | |
cover = requests.get(cover).content | |
else: | |
with open(cover, "rb") as file: | |
cover = file.read() | |
if audio_out.endswith(".mp3"): | |
audio = MP3(audio_out) | |
if audio.tags: | |
for tag_id in list(audio.tags.keys()): | |
del audio.tags[tag_id] | |
else: | |
audio.add_tags() | |
audio.tags.add(TIT2(encoding=3, text=title)) | |
audio.tags.add(TPE1(encoding=3, text=artist)) | |
audio.tags.add(TALB(encoding=3, text=album)) | |
audio.tags.add(USLT(encoding=3, desc="Lyrics", text=lyric)) | |
if cover: | |
audio.tags.add( | |
APIC( | |
encoding=3, # UTF-8 编码 | |
mime="image/jpeg", # 图片的 MIME 类型 | |
type=3, # 封面图片 | |
desc="Cover", | |
data=cover, | |
) | |
) | |
audio.save() | |
elif audio_out.endswith(".flac"): | |
audio = FLAC(audio_out) | |
audio.tags["TITLE"] = title | |
audio.tags["ARTIST"] = artist | |
audio.tags["ALBUM"] = album | |
audio.tags["LYRICS"] = lyric | |
audio.clear_pictures() | |
if cover: | |
picture = Picture() | |
picture.type = 3 # 封面图片 | |
picture.mime = "image/jpeg" | |
picture.data = cover | |
audio.add_picture(picture) | |
audio.save() | |
return audio_out | |
def extract_fst_url(text): | |
match = re.search(r'(https?://[^\s"]+)', text) | |
if match: | |
return match.group(1) | |
else: | |
return None | |
def extract_fst_int(text: str): | |
match = re.search(r"\d+", text) | |
if match: | |
return str(int(match.group())) | |
else: | |
return None | |
def get_real_url(short_url): | |
return requests.get( | |
short_url, | |
headers=HEADER, | |
allow_redirects=True, | |
timeout=TIMEOUT, | |
).url | |
def mk_dir(dirpath: str): | |
if not os.path.exists(dirpath): | |
os.makedirs(dirpath) | |
def rm_dir(dirpath: str): | |
if os.path.exists(dirpath): | |
shutil.rmtree(dirpath) | |
def clean_dir(dirpath: str): | |
rm_dir(dirpath) | |
os.makedirs(dirpath) | |
def download_file( | |
id: int, | |
url: str, | |
title: str, | |
artist: str, | |
album: str, | |
lyric: str, | |
cover: str, | |
cache: str, | |
): | |
clean_dir(cache) | |
fmt = url.split(".")[-1] | |
local_file = f"{cache}/{id}.{fmt}" | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
total_size = int(response.headers.get("Content-Length", 0)) + 1 | |
time_stamp = timestamp() | |
progress_bar = tqdm( | |
total=total_size, | |
unit="B", | |
unit_scale=True, | |
desc=f"[{time_stamp}] {local_file}", | |
) | |
with open(local_file, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: # 确保 chunk 不为空 | |
f.write(chunk) # 更新进度条 | |
progress_bar.update(len(chunk)) | |
return insert_meta(local_file, title, artist, album, lyric, cover) | |
else: | |
raise ConnectionError(f"下载失败,状态码:{response.status_code}") | |