Newer
Older
MusicDownloader / src / download.py
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import unquote
import os
import time
import glob
from dataclasses import dataclass
import json
from io import StringIO
from contextlib import redirect_stdout
import string
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

mp3_url_template = "https://antiserver.kuwo.cn/anti.s?type=convert_url&rid={}&format=mp3&response=url"
music_name_template = "{name}_{artist}_{album}_{id}.mp3"

destination_folder = os.getenv('MUSIC_FOLDER', "/nas/music/kuwo")
root_url = os.getenv("MUSIC_URL",  "https://www.kuwo.cn/bang/content?name=%E6%8A%96%E9%9F%B3%E7%83%AD%E6%AD%8C%E6%A6%9C")
invalid_music_file_path = os.path.join(destination_folder, "__INVALID_MUSICS.txt")
if not os.path.exists(invalid_music_file_path):
    open(invalid_music_file_path, 'a').close()
invalid_musics=set(line.strip() for line in open(invalid_music_file_path))
@dataclass
class Music:
    def __init__(self, id: str, artist: str, album: str, name: str, pay:str):
        self.artist = artist
        self.album = album
        self.id = id
        self.name = name
        self.pay = pay
        self.url = ""
        self.targeting_file = ""
        self.status ="pending"
        self.file_name = ""

    def exists(self):
        return os.path.exists(self.targeting_file) or self.file_name in invalid_musics

    def download(self):
        print("Checking url " + self.url)
        r = requests.get(self.url, verify=False, timeout=5)
        if r.status_code == 200:
            with open(self.targeting_file, 'wb') as f:
                f.write(r.content)
            self.status = "success"
        else:
            self.status = "fail"
        r.close()
        return self.status 

def convertToUtf8(str):
    f = StringIO()
    res = ""
    for i, ch in enumerate(str):
        if ch =='x' and len(str) > i+2 and all(c in string.hexdigits for c in str[i+1: i+3]):
            res += "\\x"
        else:
            res += ch
    with redirect_stdout(f):
        exec('print(b\''+ res + '\'.decode(\'utf-8\'))')
    # print(f.getvalue())
    res = f.getvalue().replace("\n","")
    return res

def getMp3Url(url):
    try:
        res = requests.get(url, verify= False, timeout=10)   
        url =  res.content.decode('utf-8')
        res.close()
        return url
    except:
        print("error to get mp3 url")
        return ""


def getMusicList(url, retry):
    try:
        res = requests.get(root_url,verify=False, timeout=20)
        content = res.content
        res.close()
        return content
    except:
        if retry > 3:
            return ""
        time.sleep(1)
        print("unable to access the kuwo.cn. Retries: ")
        getMusicList(url, retry+1)




content = getMusicList(root_url, 0)
if content == "" :
    print("Unable to access the music list. Exit")
    exit(0)
body = BeautifulSoup(str(content), "html.parser")
new_music_count = 0
for item in body.find_all("div", {"class", "tools"}):
    json_payload=item["data-music"].replace("'","").replace("\\","")
    if not json_payload.endswith("}"):
        json_payload += '", "pay":""}'
    music_data = json.loads(json_payload)
    music = Music(music_data["id"],   convertToUtf8(music_data["artist"]) if "artist" in music_data  else "", convertToUtf8(music_data["album"]) if "album" in music_data  else "", convertToUtf8(music_data["name"]) if "name" in music_data   else "", music_data["pay"])
    # print(music.name)
    # print(music.artist)
    # print(music.album)
    # print(music.id)
    # print(music.pay)
    music.file_name = music_name_template.format(name=music.name, artist=music.artist, album=music.album, id= music.id.replace("MUSIC_", ""))
    music.targeting_file = "{}/{}".format(destination_folder, music.file_name)
    if music.exists():
        print("[Info] Skipping the music {}".format(music.targeting_file))
        continue
    new_music_count +=1
    music.url = getMp3Url(mp3_url_template.format(music.id))
    if music.url == "":
        continue
    try:
         music.download()
    except:
        print("error")
    print("[Info] Download music \"{}\" with the status: {}".format(music.file_name, music.status))
    # If the size of the file is small to an identical number, it means the music is invalid.
    if  os.path.exists(music.targeting_file) and os.path.getsize(music.targeting_file) ==  181521:
        with open(invalid_music_file_path, 'a') as the_file:
            the_file.write("{}\n".format(music.file_name))
        invalid_musics.add(music.file_name)
        os.remove(music.targeting_file)
        new_music_count -=1
        print("[Info] Delete the music \"{}\" due to the invalid size".format(music.file_name))

print("[Summary] Found {} new musics".format(new_music_count))

# print("Processing the music")
# MusicPatcher().process_music()