Newer
Older
MusicDownloader / src / main.py
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import unquote
import os
import time
import glob
from dataclasses import dataclass
import json
from io import StringIO
from contextlib import redirect_stdout
from patch import MusicPatcher
import string

mp3_url_template = "https://antiserver.kuwo.cn/anti.s?type=convert_url&rid={}&format=mp3&response=url"
music_name_template = "{name}_{artist}_{album}_{id}.mp3"

destination_folder = os.getenv('MUSIC_FOLDER', "/nas/music")
root_url = os.getenv("MUSIC_URL",  "https://www.kuwo.cn/bang/content?name=%E6%8A%96%E9%9F%B3%E7%83%AD%E6%AD%8C%E6%A6%9C")
@dataclass
class Music:
    def __init__(self, id: str, artist: str, album: str, name: str, pay:str):
        self.artist = artist
        self.album = album
        self.id = id
        self.name = name
        self.pay = pay
        self.url = ""
        self.targeting_file = ""
        self.status ="pending"
        self.file_name = ""
    
    def exists(self):
        return os.path.exists(self.targeting_file)

    def download(self):
        print("Checking url " + self.url)
        r = requests.get(self.url, verify=False, timeout=5)
        if r.status_code == 200:
            with open(self.targeting_file, 'wb') as f:
                f.write(r.content)
            self.status = "success"
        else:
            self.status = "fail"
        r.close()
        return self.status 

def convertToUtf8(str):
    f = StringIO()
    res = ""
    for i, ch in enumerate(str):
        if ch =='x' and len(str) > i+2 and all(c in string.hexdigits for c in str[i+1: i+3]):
            res += "\\x"
        else:
            res += ch
    with redirect_stdout(f):
        exec('print(b\''+ res + '\'.decode(\'utf-8\'))')
    # print(f.getvalue())
    res = f.getvalue().replace("\n","")
    return res

def getMp3Url(url):
    res = requests.get(url)   
    url =  res.content.decode('utf-8')
    res.close()
    return url



res = requests.get(root_url)
body = BeautifulSoup(str(res.content), "html.parser")
res.close()
new_music_count = 0
for item in body.find_all("div", {"class", "tools"}):
    json_payload=item["data-music"].replace("'","").replace("\\","")
    if not json_payload.endswith("}"):
        json_payload += '", "pay":""}'
    music_data = json.loads(json_payload)
    music = Music(music_data["id"],   convertToUtf8(music_data["artist"]) if "artist" in music_data  else "", convertToUtf8(music_data["album"]) if "album" in music_data  else "", convertToUtf8(music_data["name"]) if "name" in music_data   else "", music_data["pay"])
    # print(music.name)
    # print(music.artist)
    # print(music.album)
    # print(music.id)
    # print(music.pay)
    music.file_name = music_name_template.format(name=music.name, artist=music.artist, album=music.album, id= music.id.replace("MUSIC_", ""))
    music.targeting_file = "{}/{}".format(destination_folder, music.file_name)
    if music.exists():
        # print("[Info] Skipping the music {}".format(music.targeting_file))
        continue
    new_music_count +=1
    music.url = getMp3Url(mp3_url_template.format(music.id))
    try:
         music.download()
    except:
        print("error")
    print("[Info] Download music \"{}\" with the status: {}".format(music.file_name, music.status))
print("[Summary] Found {} new musics".format(new_music_count))

print("Processing the music")
MusicPatcher().process_music()