import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import unquote
import os
import time
import glob
from dataclasses import dataclass
import json
from io import StringIO
from contextlib import redirect_stdout
import string
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
mp3_url_template = "https://antiserver.kuwo.cn/anti.s?type=convert_url&rid={}&format=mp3&response=url"
music_name_template = "{name}_{artist}_{album}_{id}.mp3"
destination_folder = os.getenv('MUSIC_FOLDER', "/nas/music/kuwo")
root_url = os.getenv("MUSIC_URL", "https://www.kuwo.cn/bang/content?name=%E6%8A%96%E9%9F%B3%E7%83%AD%E6%AD%8C%E6%A6%9C")
invalid_music_file_path = os.path.join(destination_folder, "__INVALID_MUSICS.txt")
if not os.path.exists(invalid_music_file_path):
open(invalid_music_file_path, 'a').close()
invalid_musics=set(line.strip() for line in open(invalid_music_file_path))
@dataclass
class Music:
def __init__(self, id: str, artist: str, album: str, name: str, pay:str):
self.artist = artist
self.album = album
self.id = id
self.name = name
self.pay = pay
self.url = ""
self.targeting_file = ""
self.status ="pending"
self.file_name = ""
def exists(self):
return os.path.exists(self.targeting_file) or self.file_name in invalid_musics
def download(self):
print("Checking url " + self.url)
r = requests.get(self.url, verify=False, timeout=5)
if r.status_code == 200:
with open(self.targeting_file, 'wb') as f:
f.write(r.content)
self.status = "success"
else:
self.status = "fail"
r.close()
return self.status
def convertToUtf8(str):
f = StringIO()
res = ""
for i, ch in enumerate(str):
if ch =='x' and len(str) > i+2 and all(c in string.hexdigits for c in str[i+1: i+3]):
res += "\\x"
else:
res += ch
with redirect_stdout(f):
exec('print(b\''+ res + '\'.decode(\'utf-8\'))')
# print(f.getvalue())
res = f.getvalue().replace("\n","")
return res
def getMp3Url(url):
try:
res = requests.get(url, verify= False, timeout=10)
url = res.content.decode('utf-8')
res.close()
return url
except:
print("error to get mp3 url")
return ""
def getMusicList(url, retry):
try:
res = requests.get(root_url,verify=False, timeout=20)
content = res.content
res.close()
return content
except:
if retry > 3:
return ""
time.sleep(1)
print("unable to access the kuwo.cn. Retries: ")
getMusicList(url, retry+1)
content = getMusicList(root_url, 0)
if content == "" :
print("Unable to access the music list. Exit")
exit(0)
body = BeautifulSoup(str(content), "html.parser")
new_music_count = 0
for item in body.find_all("div", {"class", "tools"}):
json_payload=item["data-music"].replace("'","").replace("\\","")
if not json_payload.endswith("}"):
json_payload += '", "pay":""}'
music_data = json.loads(json_payload)
music = Music(music_data["id"], convertToUtf8(music_data["artist"]) if "artist" in music_data else "", convertToUtf8(music_data["album"]) if "album" in music_data else "", convertToUtf8(music_data["name"]) if "name" in music_data else "", music_data["pay"])
# print(music.name)
# print(music.artist)
# print(music.album)
# print(music.id)
# print(music.pay)
music.file_name = music_name_template.format(name=music.name, artist=music.artist, album=music.album, id= music.id.replace("MUSIC_", ""))
music.targeting_file = "{}/{}".format(destination_folder, music.file_name)
if music.exists():
print("[Info] Skipping the music {}".format(music.targeting_file))
continue
new_music_count +=1
music.url = getMp3Url(mp3_url_template.format(music.id))
if music.url == "":
continue
try:
music.download()
except:
print("error")
print("[Info] Download music \"{}\" with the status: {}".format(music.file_name, music.status))
# If the size of the file is small to an identical number, it means the music is invalid.
if os.path.exists(music.targeting_file) and os.path.getsize(music.targeting_file) == 181521:
with open(invalid_music_file_path, 'a') as the_file:
the_file.write("{}\n".format(music.file_name))
invalid_musics.add(music.file_name)
os.remove(music.targeting_file)
new_music_count -=1
print("[Info] Delete the music \"{}\" due to the invalid size".format(music.file_name))
print("[Summary] Found {} new musics".format(new_music_count))
# print("Processing the music")
# MusicPatcher().process_music()