#!/usr/bin/env python3 import argparse import collections import glob import json import music_tag import os from pathlib import Path import requests import shutil import sys import youtube_dl from ytmusicapi import YTMusic # import musicpd # TODO: set this to false to begin with. for now it's always gonna print # debug information DEBUG = True # Includes the format characters YOUTUBE_MUSIC_URL = "https://music.youtube.com/watch?v={}" """ How the albums will be formatted: { 'id': { 'name': '', 'tracks': [ { 'id': '', 'name': '' } ], } } """ ALBUMS = collections.defaultdict(lambda: None) # Array to save any failed songs and their reasons FAILED_SONGS = [] class UnableToCreateAlbumDirectoryError(Exception): pass class UnableToFindThumbnailFileError(Exception): pass def get_command_line_options(): parser = argparse.ArgumentParser( description="Download songs from YouTube Music") # NOTE: I'm not sure if this param is necessary anymore. If playlist # is public/unlisted, then it shouldn't need the cookies. parser.add_argument("--cookies", metavar="cookiefile.txt", # TODO: store this in a config file default=os.path.join(os.path.expanduser("~"), "cookies.txt"), type=str, help="Cookie file to use.") parser.add_argument("--debug_location", metavar="~/Music/jamos_debug", default=None, type=str, help="Cookie file to use.") # NOTE: unlike cookies, this is required to get the library parser.add_argument("--headers", metavar="header_path", # TODO: store this in a config file default='~/.config/jamos/headers.json', type=str, help="Header file to use.") parser.add_argument("-o", "--output", metavar="output_directory", # TODO: store it in a config file default=os.path.join(os.path.expanduser("~"), "Music"), type=str, help="Output directory to use") parser.add_argument("--playlist_id", metavar='PLFsQleAWXsj_4yDeebiIADdH5FMayBiJo', nargs='?', type=str, help="""Playlist id to download. Playlist MUST be public or unlisted, but NOT private""") parser.add_argument("-r", "--retry_file", metavar="jamos_failed_urls.txt", default=None, type=str, help="File to store songs that were not completed") parser.add_argument("--song_id", metavar="lYBUbBu4W08", nargs='?', type=str, help="Song id to download.") parser.add_argument("--song_limit", metavar="5000", default=5000, type=int, help="Number of songs to download") parser.add_argument("-t", "--thumbnail_folder", metavar="~/Music/jamos_thumbnails", # TODO: store it in a config file default=os.path.join(os.path.expanduser("~"), "Music", "jamos_thumbnails"), type=str, help="Directory to save thumbnails for albums") parser.add_argument("-v", "--verbose", action='store_true', help="Directory to save thumbnails for albums") args = parser.parse_args() # Validate args if args.verbose: global DEBUG DEBUG = True if args.song_id and args.playlist_id: parser.error('--song_id and --playlist_id may not be passed together!') if args.song_id and args.retry_file: parser.error('--song_id and --retry_file may not be passed together!') if args.playlist_id and args.retry_file: parser.error( "--playlist_id and --retry_file may not be passed together!") return args def create_downloader(music_directory, cookies=None): audio_options = { 'format': 'mp3/bestaudio/best', 'cookiefile': cookies, 'outtmpl': os.path.join(music_directory, '%(id)s.%(ext)s'), 'postprocessors': [ { 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }, {'key': 'FFmpegMetadata'}, ], # 'writeinfojson': True, 'quiet': not DEBUG } return youtube_dl.YoutubeDL(audio_options) def remove_special_characters_for_filename(filename): special_chars = [ ['-', ' '], ['(', ''], [')', ''], ['/', ' '], ['/', ' '], [' ', '_'], ["'", ''], ["&", 'and'], [chr(8217), ''], ['$', 's'], ['.', ''] ] new_name = filename for char_set in special_chars: new_name = new_name.replace(char_set[0], char_set[1]) return new_name.lower() def create_youtube_music_api_object(header_path): return YTMusic(header_path) def save_album(app, args, album_id): raw_album = app.get_album(album_id) # TODO: Add get_thumbnail function thumbnail_url = raw_album['thumbnails'][-1]['url'] parsed_album = { 'name': raw_album['title'], 'tracks': [{'title': song['title'], 'id': song['videoId'], 'track_num': index + 1} for index, song in enumerate(raw_album['tracks'])], 'year': raw_album['year'], 'thumbnail_url': thumbnail_url, 'thumbnail_filepath': None } try: r = requests.get(thumbnail_url, stream=True) filename = os.path.join(args.thumbnail_folder, album_id + '.jpg') if not os.path.isfile(filename): # Check if the image was retrieved successfully if r.status_code == 200: # Set decode_content value to True, # otherwise the downloaded image file's size will be zero. r.raw.decode_content = True # Open a local file with wb ( write binary ) permission. with open(filename, 'wb') as f: shutil.copyfileobj(r.raw, f) print('Thumbnail sucessfully Downloaded: {}'.format(filename)) else: print('Thumbnail could not be retrieved: {}'.format(filename)) parsed_album['thumbnail_filepath'] = filename except Exception as ex: # NOTE: This is gonna be a pain to fix if a thumbnail doesn't download print("Could not download thumbnail") print(ex) ALBUMS[album_id] = parsed_album def save_debug_data(filename, data, debug=True): if debug: with open(filename, 'w') as f: f.write(data) # TODO: Good place for some fancy yield work? def parse_songs(app, args, all_raw_song_data): all_parsed_songs = [] for raw_song in all_raw_song_data: song_id = raw_song['videoId'] album_id = raw_song['album']['id'] if not ALBUMS[album_id]: save_album(app, args, album_id) album = ALBUMS[album_id] track_num = None for track in ALBUMS[album_id]['tracks']: if track['id'] == song_id: track_num = track['track_num'] parsed_song = { 'id': raw_song['videoId'], 'title': raw_song['title'], 'artists': [artist['name'] for artist in raw_song['artists']], 'album': album['name'], 'track': track_num, 'url': YOUTUBE_MUSIC_URL.format(raw_song['videoId']), 'year': album['year'], 'thumbnail_filepath': album['thumbnail_filepath'] } all_parsed_songs.append(parsed_song) if args.debug_location: save_debug_data('parsed_album_data.json', json.dumps(ALBUMS), debug=DEBUG) save_debug_data('parsed_song_data.json', json.dumps(all_parsed_songs), debug=DEBUG) return all_parsed_songs def get_library_songs(app, args, order='a_to_z'): all_raw_song_data = app.get_library_songs(limit=args.song_limit, order=order) if args.debug_location: path = os.path.join(args.debug_location, 'raw_song_data.json') save_debug_data(path, json.dumps(all_raw_song_data), debug=DEBUG) all_parsed_songs = parse_songs(app, args, all_raw_song_data) return all_parsed_songs # def stuff(): # # TODO: Can you use the cookies from YTMusic here # # cookies = os.path.join(os.path.expanduser("~"), "cookies.txt") # cookies = './cookies.txt' # ytdl = create_downloader(music_directory, cookies) # failed_songs = [] # for song in []: # try: # ytdl.extract_info(song['url'], download=True) # except Exception as ex: # print(ex) # print("Could not download: {}".format((song))) # failed_songs.append(song) # failed_songs = [] # for song in songs: # try: # print("Downloading: {} - {} - {} from {}...".format( # song['title'], song['artist'], song['album'], song['url']), # end='') # ytdl.extract_info(song['url'], download=True) # sys.exit() # print("Done.") # except Exception as ex: # print(ex) # print("Could not download: {}".format((song['url']))) # failed_songs.append(song) # files = get_all_files(music_directory) # counter = 1 # for f in files: # try: # print("Adding metadata to {} ...".format(f), end="") # with open(f.replace('.mp3', '.info.json')) as json_file: # json_data = json.load(json_file) # metadata = get_song_metadata_from_json(json_data, counter) # write_metadata_to_song_file(f, metadata) # print("Done") # print("Moving file...", end="") # move_file(f, metadata, music_directory) # print("Done") # counter += 1 # except Exception as e: # # just gonna print this and move on to the next file. # print(e) # print("Cleaning up JSON files...", end='') # cleanup_metadata_files(music_directory) # print("Done.") # TODO: Implement def retry_downloading_songs(): # try: # if len(failed_urls) > 1: # print("Saving failed urls to txt file.") # save_urls_from_playlist_to_file( # os.path.join(music_directory, "jamos_failed_urls.txt"), # failed_urls) # elif args.retry_file: # # Just because we don't have any failed urls in this run, doesn't # # mean that we can get rid of the retry file. We'll only remove it # # if it's been explicitly tried and we have no failed urls. # # We've successfully downloaded all of the previously failed urls. # # Delete the file # os.remove(args.retry_file) # except Exception as ex: # print(ex) # print("Saving failed urls to file failed! Printing failed urls:") # for url in failed_urls: # print(url) print("Not Implemented Yet!") sys.exit() # TODO: Implement def handle_downloading_playlist(): print("Not Implemented Yet!") sys.exit() pass # def sanitize_for_filename(filename): # new_filename = filename.replace(' ', '_') # new_filename = (''.join( # [s for s in new_filename if s.isalnum() or s == '_'])).lower() # return new_filename def tag_song(filename, song_data): try: # tag basic data file = music_tag.load_file(filename) file['name'] = song_data['title'] file['artist'] = song_data['artists'][0] file['albumartist'] = song_data['artists'][0] file['album'] = song_data['album'] file['tracknumber'] = song_data['track'] file['year'] = song_data['year'] # include album cover try: # NOTE: this will be at least None if song_data['thumbnail_filepath']: with open(song_data['thumbnail_filepath'], 'rb') as img_in: file['artwork'] = img_in.read() except Exception: raise UnableToFindThumbnailFileError() # save music tag data file.save() except Exception: print("could not tag song!") def move_song_to_permanent_location(music_directory, song_data, filename): try: # move song to music_directory/artist/album/artist_album_title.ext # excluding non filesafe characters artist_for_filename = remove_special_characters_for_filename( song_data['artists'][0]) album_for_filename = remove_special_characters_for_filename( song_data['album']) title_for_filename = remove_special_characters_for_filename( song_data['title']) song_output_dir = os.path.join(music_directory, artist_for_filename, album_for_filename) try: Path(song_output_dir).mkdir(parents=True, exist_ok=True) except OSError as ex: raise UnableToCreateAlbumDirectoryError(ex) new_filename = '{}_{}_{}.mp3'.format(artist_for_filename, album_for_filename, title_for_filename) shutil.move(filename, os.path.join(song_output_dir, new_filename)) except Exception: print("could not move file to correct directory!") def download_songs(ytdl, music_directory, songs): for song in songs: try: # download song ytdl.extract_info(song['url'], download=True) # get filename song was downloaded to filenames = glob.glob( os.path.join(music_directory, song['id'] + '.*')) filename = filenames[0] if not filename: raise FileNotFoundError() tag_song(filename, song) move_song_to_permanent_location(music_directory, song, filename) except UnableToCreateAlbumDirectoryError as ex: add_to_failed_songs(song, ex) print('Could not download {} - {}, moving on'.format( song['title'], song['artists'][0])) except FileNotFoundError: error_reason = 'Thought song downloaded, but filename was None' add_to_failed_songs(song, error_reason) print('Could not download {} - {}, moving on'.format( song['title'], song['artists'][0])) except Exception: add_to_failed_songs( song, 'Unknown exception while downloading song.') print('Could not download {} - {}, moving on'.format( song['title'], song['artists'][0])) def add_to_failed_songs(song, reason): song['error_reason'] = reason global FAILED_SONGS FAILED_SONGS.append(song) if __name__ == "__main__": try: args = get_command_line_options() music_directory = args.output app = create_youtube_music_api_object(args.headers) songs = get_library_songs(app, args) ytdl = create_downloader(music_directory) download_songs(ytdl, music_directory, songs) except Exception as ex: print(ex) sys.exit() print("Complete.")