import os import re import shutil import requests from tqdm import tqdm from datetime import datetime from zoneinfo import ZoneInfo from tzlocal import get_localzone from mutagen.mp3 import MP3 from mutagen.flac import FLAC, Picture from mutagen.id3 import TIT2, TPE1, TALB, USLT, APIC EN_US = os.getenv("LANG") != "zh_CN.UTF-8" API_163 = os.getenv("api_music163") API_KUWO = os.getenv("api_kuwo") API_QQ_2 = os.getenv("api_qmusic_2") API_QQ_1 = os.getenv("api_qmusic_1") KEY_QQ_1 = os.getenv("apikey_qmusic_1") if not (API_163 and API_KUWO and API_QQ_2 and API_QQ_1 and KEY_QQ_1): print("请检查环境变量!") exit() TIMEOUT = None TMP_DIR = "./__pycache__" HEADER = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36" } def timestamp(naive_time: datetime = None, target_tz=ZoneInfo("Asia/Shanghai")): if not naive_time: naive_time = datetime.now() local_tz = get_localzone() aware_local = naive_time.replace(tzinfo=local_tz) return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S") def insert_meta(audio_in: str, title, artist, album, lyric, cover: str, audio_out=""): if audio_out: shutil.copyfile(audio_in, audio_out) else: audio_out = audio_in if cover: if cover.startswith("http"): cover = requests.get(cover).content else: with open(cover, "rb") as file: cover = file.read() if audio_out.endswith(".mp3"): audio = MP3(audio_out) if audio.tags: for tag_id in list(audio.tags.keys()): del audio.tags[tag_id] else: audio.add_tags() audio.tags.add(TIT2(encoding=3, text=title)) audio.tags.add(TPE1(encoding=3, text=artist)) audio.tags.add(TALB(encoding=3, text=album)) audio.tags.add(USLT(encoding=3, desc="Lyrics", text=lyric)) if cover: audio.tags.add( APIC( encoding=3, # UTF-8 编码 mime="image/jpeg", # 图片的 MIME 类型 type=3, # 封面图片 desc="Cover", data=cover, ) ) audio.save() elif audio_out.endswith(".flac"): audio = FLAC(audio_out) audio.tags["TITLE"] = title audio.tags["ARTIST"] = artist audio.tags["ALBUM"] = album audio.tags["LYRICS"] = lyric audio.clear_pictures() if cover: picture = Picture() picture.type = 3 # 封面图片 picture.mime = "image/jpeg" picture.data = cover audio.add_picture(picture) audio.save() return audio_out def extract_fst_url(text): match = re.search(r'(https?://[^\s"]+)', text) if match: return match.group(1) else: return None def extract_fst_int(text: str): match = re.search(r"\d+", text) if match: return str(int(match.group())) else: return None def get_real_url(short_url): return requests.get( short_url, headers=HEADER, allow_redirects=True, timeout=TIMEOUT, ).url def mk_dir(dirpath: str): if not os.path.exists(dirpath): os.makedirs(dirpath) def rm_dir(dirpath: str): if os.path.exists(dirpath): shutil.rmtree(dirpath) def clean_dir(dirpath: str): rm_dir(dirpath) os.makedirs(dirpath) def download_file( id: int, url: str, title: str, artist: str, album: str, lyric: str, cover: str, cache: str, ): clean_dir(cache) fmt = url.split(".")[-1] local_file = f"{cache}/{id}.{fmt}" response = requests.get(url, stream=True) if response.status_code == 200: total_size = int(response.headers.get("Content-Length", 0)) + 1 time_stamp = timestamp() progress_bar = tqdm( total=total_size, unit="B", unit_scale=True, desc=f"[{time_stamp}] {local_file}", ) with open(local_file, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: # 确保 chunk 不为空 f.write(chunk) # 更新进度条 progress_bar.update(len(chunk)) return insert_meta(local_file, title, artist, album, lyric, cover) else: raise ConnectionError(f"下载失败,状态码:{response.status_code}")