import requests from bs4 import BeautifulSoup import re from urllib.parse import unquote import os import time import glob from dataclasses import dataclass import json from io import StringIO from contextlib import redirect_stdout import string import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) mp3_url_template = "https://antiserver.kuwo.cn/anti.s?type=convert_url&rid={}&format=mp3&response=url" music_name_template = "{name}_{artist}_{album}_{id}.mp3" destination_folder = os.getenv('MUSIC_FOLDER', "/nas/music/kuwo") root_url = os.getenv("MUSIC_URL", "https://www.kuwo.cn/bang/content?name=%E6%8A%96%E9%9F%B3%E7%83%AD%E6%AD%8C%E6%A6%9C") invalid_music_file_path = os.path.join(destination_folder, "__INVALID_MUSICS.txt") if not os.path.exists(invalid_music_file_path): open(invalid_music_file_path, 'a').close() invalid_musics=set(line.strip() for line in open(invalid_music_file_path)) @dataclass class Music: def __init__(self, id: str, artist: str, album: str, name: str, pay:str): self.artist = artist self.album = album self.id = id self.name = name self.pay = pay self.url = "" self.targeting_file = "" self.status ="pending" self.file_name = "" def exists(self): return os.path.exists(self.targeting_file) or self.file_name in invalid_musics def download(self): print("Checking url " + self.url) r = requests.get(self.url, verify=False, timeout=5) if r.status_code == 200: with open(self.targeting_file, 'wb') as f: f.write(r.content) self.status = "success" else: self.status = "fail" r.close() return self.status def convertToUtf8(str): f = StringIO() res = "" for i, ch in enumerate(str): if ch =='x' and len(str) > i+2 and all(c in string.hexdigits for c in str[i+1: i+3]): res += "\\x" else: res += ch with redirect_stdout(f): exec('print(b\''+ res + '\'.decode(\'utf-8\'))') # print(f.getvalue()) res = f.getvalue().replace("\n","") return res def getMp3Url(url): try: res = requests.get(url, verify= False, timeout=10) url = res.content.decode('utf-8') res.close() return url except: print("error to get mp3 url") return "" def getMusicList(url, retry): try: res = requests.get(root_url,verify=False, timeout=20) content = res.content res.close() return content except: if retry > 3: return "" time.sleep(1) print("unable to access the kuwo.cn. Retries: ") getMusicList(url, retry+1) content = getMusicList(root_url, 0) if content == "" : print("Unable to access the music list. Exit") exit(0) body = BeautifulSoup(str(content), "html.parser") new_music_count = 0 for item in body.find_all("div", {"class", "tools"}): json_payload=item["data-music"].replace("'","").replace("\\","") if not json_payload.endswith("}"): json_payload += '", "pay":""}' music_data = json.loads(json_payload) music = Music(music_data["id"], convertToUtf8(music_data["artist"]) if "artist" in music_data else "", convertToUtf8(music_data["album"]) if "album" in music_data else "", convertToUtf8(music_data["name"]) if "name" in music_data else "", music_data["pay"]) # print(music.name) # print(music.artist) # print(music.album) # print(music.id) # print(music.pay) music.file_name = music_name_template.format(name=music.name, artist=music.artist, album=music.album, id= music.id.replace("MUSIC_", "")) music.targeting_file = "{}/{}".format(destination_folder, music.file_name) if music.exists(): print("[Info] Skipping the music {}".format(music.targeting_file)) continue new_music_count +=1 music.url = getMp3Url(mp3_url_template.format(music.id)) if music.url == "": continue try: music.download() except: print("error") print("[Info] Download music \"{}\" with the status: {}".format(music.file_name, music.status)) # If the size of the file is small to an identical number, it means the music is invalid. if os.path.exists(music.targeting_file) and os.path.getsize(music.targeting_file) == 181521: with open(invalid_music_file_path, 'a') as the_file: the_file.write("{}\n".format(music.file_name)) invalid_musics.add(music.file_name) os.remove(music.targeting_file) new_music_count -=1 print("[Info] Delete the music \"{}\" due to the invalid size".format(music.file_name)) print("[Summary] Found {} new musics".format(new_music_count)) # print("Processing the music") # MusicPatcher().process_music()