import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import unquote
import os
import time
import glob
from dataclasses import dataclass
import json
from io import StringIO
from contextlib import redirect_stdout
from patch import MusicPatcher
import string
mp3_url_template = "https://antiserver.kuwo.cn/anti.s?type=convert_url&rid={}&format=mp3&response=url"
music_name_template = "{name}_{artist}_{album}_{id}.mp3"
destination_folder = os.getenv('MUSIC_FOLDER', "/nas/music")
root_url = os.getenv("MUSIC_URL", "https://www.kuwo.cn/bang/content?name=%E6%8A%96%E9%9F%B3%E7%83%AD%E6%AD%8C%E6%A6%9C")
@dataclass
class Music:
def __init__(self, id: str, artist: str, album: str, name: str, pay:str):
self.artist = artist
self.album = album
self.id = id
self.name = name
self.pay = pay
self.url = ""
self.targeting_file = ""
self.status ="pending"
self.file_name = ""
def exists(self):
return os.path.exists(self.targeting_file)
def download(self):
print("Checking url " + self.url)
r = requests.get(self.url, verify=False, timeout=5)
if r.status_code == 200:
with open(self.targeting_file, 'wb') as f:
f.write(r.content)
self.status = "success"
else:
self.status = "fail"
r.close()
return self.status
def convertToUtf8(str):
f = StringIO()
res = ""
for i, ch in enumerate(str):
if ch =='x' and len(str) > i+2 and all(c in string.hexdigits for c in str[i+1: i+3]):
res += "\\x"
else:
res += ch
with redirect_stdout(f):
exec('print(b\''+ res + '\'.decode(\'utf-8\'))')
# print(f.getvalue())
res = f.getvalue().replace("\n","")
return res
def getMp3Url(url):
try:
res = requests.get(url, verify= False, timeout=10)
url = res.content.decode('utf-8')
res.close()
return url
except:
print("error to get mp3 url")
return ""
res = requests.get(root_url,verify=False, timeout=20)
body = BeautifulSoup(str(res.content), "html.parser")
res.close()
new_music_count = 0
for item in body.find_all("div", {"class", "tools"}):
json_payload=item["data-music"].replace("'","").replace("\\","")
if not json_payload.endswith("}"):
json_payload += '", "pay":""}'
music_data = json.loads(json_payload)
music = Music(music_data["id"], convertToUtf8(music_data["artist"]) if "artist" in music_data else "", convertToUtf8(music_data["album"]) if "album" in music_data else "", convertToUtf8(music_data["name"]) if "name" in music_data else "", music_data["pay"])
# print(music.name)
# print(music.artist)
# print(music.album)
# print(music.id)
# print(music.pay)
music.file_name = music_name_template.format(name=music.name, artist=music.artist, album=music.album, id= music.id.replace("MUSIC_", ""))
music.targeting_file = "{}/{}".format(destination_folder, music.file_name)
if music.exists():
print("[Info] Skipping the music {}".format(music.targeting_file))
continue
new_music_count +=1
music.url = getMp3Url(mp3_url_template.format(music.id))
if music.url == "":
continue
try:
music.download()
except:
print("error")
print("[Info] Download music \"{}\" with the status: {}".format(music.file_name, music.status))
print("[Summary] Found {} new musics".format(new_music_count))
print("Processing the music")
MusicPatcher().process_music()