import json import tempfile import zipfile import os from open_workout_coach.create_open_workout_database import create_workout_session, create_item, create_workout_database ITEM_KEY_MEDIA_TYPE_MAP = { 'video': 'videoPath', 'image': 'imagePath', } def get_all_media_from_db_by_type(db, media_type): item_key = ITEM_KEY_MEDIA_TYPE_MAP[media_type] media = list() for session in db['workoutSessions']: for workout_item in session['workoutItems']: media.append(workout_item[item_key]) media = list(set(media)) return media def map_source_destination(media_files, root_destination): media_maps = list() for file_path in media_files: dest = get_destination_maps(file_path, root_destination) media_maps.append((file_path, dest)) return media_maps def get_destination_maps(input_path, relative_destination): file = os.path.basename(input_path) destination_file_path = f'{relative_destination}/{file}' return destination_file_path def get_remote_formatted_media_path(root_name: str, media_type: str, file_path: str): actual_file = os.path.basename(file_path) remote_root = 'file:///data/user/0/com.health.openworkout/files' new_media_path = f'{remote_root}/{root_name}/{media_type}/{actual_file}' return new_media_path def refactor_media_paths_in_db(db): db_name = db['name'] for session in db['workoutSessions']: for workout_item in session['workoutItems']: item_has_video = workout_item['videoPath'] != '' if item_has_video: new_formatted_path = get_remote_formatted_media_path( media_type='video', root_name=db_name, file_path=workout_item['videoPath'] ) workout_item['videoPath'] = new_formatted_path item_has_image = workout_item['imagePath'] != '' if item_has_image: new_formatted_path = get_remote_formatted_media_path( media_type='image', root_name=db_name, file_path=workout_item['imagePath'] ) workout_item['imagePath'] = new_formatted_path return db def save_workout_file(output_files, db_file, video_files=tuple(), image_files=tuple()): video_maps = map_source_destination(video_files, 'video') image_maps = map_source_destination(image_files, 'image') media_maps = video_maps + image_maps db_json = tempfile.mktemp() with open(db_json, 'w') as f: json.dump(db_file, f) with zipfile.ZipFile(output_files, 'w') as zip_file: zip_file.write(db_json, arcname='database.json') for source, dest in media_maps: zip_file.write(source, arcname=dest) def map_config_to_workout_file(config): items = [ create_item( name=item['name'], workout_time=item.get('duration'), n_repetitions=item.get('repetitions'), description=item.get('description'), preparation_time=item.get('preparation'), break_time=item.get('break'), video_path=item.get('gif_path') ) for item in config['exercises'] ] sessions = [create_workout_session(items)] * config.get('n_sessions', 1) db = create_workout_database(config['name'], sessions) return db def process_config_into_workout_file(config, output_destination = None): db = map_config_to_workout_file(config) videos = get_all_media_from_db_by_type(db, media_type='video') images = get_all_media_from_db_by_type(db, media_type='image') db = refactor_media_paths_in_db(db) output_file_name = f"{output_destination or '.'}/{db['name']}.zip" save_workout_file( output_files=output_file_name, db_file=db, video_files=videos, )