music_parsers / utils.py
admin
sync ms
65004e0
raw
history blame
4.57 kB
import os
import re
import shutil
import requests
from tqdm import tqdm
from datetime import datetime
from zoneinfo import ZoneInfo
from tzlocal import get_localzone
from mutagen.mp3 import MP3
from mutagen.flac import FLAC, Picture
from mutagen.id3 import TIT2, TPE1, TALB, USLT, APIC
EN_US = os.getenv("LANG") != "zh_CN.UTF-8"
API_163 = os.getenv("api_music163")
API_KUWO = os.getenv("api_kuwo")
API_QQ_2 = os.getenv("api_qmusic_2")
API_QQ_1 = os.getenv("api_qmusic_1")
KEY_QQ_1 = os.getenv("apikey_qmusic_1")
if not (API_163 and API_KUWO and API_QQ_2 and API_QQ_1 and KEY_QQ_1):
print("请检查环境变量!")
exit()
TIMEOUT = None
TMP_DIR = "./__pycache__"
HEADER = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36"
}
def timestamp(naive_time: datetime = None, target_tz=ZoneInfo("Asia/Shanghai")):
if not naive_time:
naive_time = datetime.now()
local_tz = get_localzone()
aware_local = naive_time.replace(tzinfo=local_tz)
return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S")
def insert_meta(audio_in: str, title, artist, album, lyric, cover: str, audio_out=""):
if audio_out:
shutil.copyfile(audio_in, audio_out)
else:
audio_out = audio_in
if cover:
if cover.startswith("http"):
cover = requests.get(cover).content
else:
with open(cover, "rb") as file:
cover = file.read()
if audio_out.endswith(".mp3"):
audio = MP3(audio_out)
if audio.tags:
for tag_id in list(audio.tags.keys()):
del audio.tags[tag_id]
else:
audio.add_tags()
audio.tags.add(TIT2(encoding=3, text=title))
audio.tags.add(TPE1(encoding=3, text=artist))
audio.tags.add(TALB(encoding=3, text=album))
audio.tags.add(USLT(encoding=3, desc="Lyrics", text=lyric))
if cover:
audio.tags.add(
APIC(
encoding=3, # UTF-8 编码
mime="image/jpeg", # 图片的 MIME 类型
type=3, # 封面图片
desc="Cover",
data=cover,
)
)
audio.save()
elif audio_out.endswith(".flac"):
audio = FLAC(audio_out)
audio.tags["TITLE"] = title
audio.tags["ARTIST"] = artist
audio.tags["ALBUM"] = album
audio.tags["LYRICS"] = lyric
audio.clear_pictures()
if cover:
picture = Picture()
picture.type = 3 # 封面图片
picture.mime = "image/jpeg"
picture.data = cover
audio.add_picture(picture)
audio.save()
return audio_out
def extract_fst_url(text):
match = re.search(r'(https?://[^\s"]+)', text)
if match:
return match.group(1)
else:
return None
def extract_fst_int(text: str):
match = re.search(r"\d+", text)
if match:
return str(int(match.group()))
else:
return None
def get_real_url(short_url):
return requests.get(
short_url,
headers=HEADER,
allow_redirects=True,
timeout=TIMEOUT,
).url
def mk_dir(dirpath: str):
if not os.path.exists(dirpath):
os.makedirs(dirpath)
def rm_dir(dirpath: str):
if os.path.exists(dirpath):
shutil.rmtree(dirpath)
def clean_dir(dirpath: str):
rm_dir(dirpath)
os.makedirs(dirpath)
def download_file(
id: int,
url: str,
title: str,
artist: str,
album: str,
lyric: str,
cover: str,
cache: str,
):
clean_dir(cache)
fmt = url.split(".")[-1]
local_file = f"{cache}/{id}.{fmt}"
response = requests.get(url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get("Content-Length", 0)) + 1
time_stamp = timestamp()
progress_bar = tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=f"[{time_stamp}] {local_file}",
)
with open(local_file, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 确保 chunk 不为空
f.write(chunk) # 更新进度条
progress_bar.update(len(chunk))
return insert_meta(local_file, title, artist, album, lyric, cover)
else:
raise ConnectionError(f"下载失败,状态码:{response.status_code}")