#!/usr/bin/env python3 import os import sys import subprocess from time import sleep from optparse import OptionParser import threading import copy device_list = [] device_list_ready = threading.Event() dalias_version = "2.0.0" ANSI_colors={ "LGREEN":'\033[1;32m', "GREEN":'\033[0;32m', "YELLOW":'\033[0;33m', "MAGENTA":'\033[0;35m', "CYAN":'\033[0;36m', "WHITE":'\033[0;37m', "RED":'\033[0;31m', "GREY":'\033[1;30m', "END":'\033[0m' } def load_config(path): if not os.path.exists(path) or not os.path.isfile(path): print("invalid config path provided {p}".format(p=path)) sys.exit(1) file = open(path) file_content = file.read().splitlines() # remove whitespace and empty lines file_content = [x.strip().replace(" ","") for x in file_content if x.strip() and not x.startswith("#")] groomed_content = [] for line in file_content: if line not in groomed_content and line.strip() and not line.startswith("#"): groomed_content.append(line) file.close() return groomed_content def check_root(): root_test = subprocess.run(["ls","/root"],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL).returncode if root_test: print("dalias must be run with root privileges") sys.exit(root_test) def get_devices(): device_path = "/dev/disk/by-path" return os.listdir(device_path) def confirm_aliases(aliases): print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Confirm drive aliases{e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print("The following drive aliases will be used:",end="\n\t") print("\n ".join(aliases)) confirm = input("\nis this ok? (y/n): ") if confirm != "y": print("modify aliases to be used by editing the config file provided, or provide an alternate config file.") sys.exit(1) def wait_for_changes(devices): current_devices = get_devices() while(current_devices == devices): print(".",end="",flush=True) sleep(1.5) current_devices = get_devices() global device_list device_list = copy.deepcopy(current_devices) device_list_ready.set() def get_alias_path(alias,used_slots): done = False path = None while not done: print(" {g}Insert{e}/{y}Remove{e} a drive from slot ({w}{a}{e}):".format(a=alias,g=ANSI_colors["GREEN"],y=ANSI_colors["YELLOW"],w=ANSI_colors["WHITE"],e=ANSI_colors["END"]),end="",flush=True) device_list_old = copy.deepcopy(list(get_devices())) thread = threading.Thread(target=wait_for_changes(device_list_old)) thread.start() device_list_ready.wait() if len(device_list) != len(device_list_old): # A change has occurred if len(device_list) > len(device_list_old): # drive was inserted slot = [x for x in list(set(device_list).difference(set(device_list_old))) if "part" not in x and x.endswith(".0")] print(" {c}[Drive Inserted]{e}".format(c=ANSI_colors["GREEN"],e=ANSI_colors["END"]),flush=True,end="") elif len(device_list) < len(device_list_old): # drive was removed from a slot slot = [x for x in list(set(device_list_old).difference(set(device_list))) if "part" not in x and x.endswith(".0")] print(" {c}[Drive Removed]{e}".format(c=ANSI_colors["YELLOW"],e=ANSI_colors["END"]),flush=True,end="") if len(slot) == 1 and slot[0] not in used_slots: print(" {c}[Device Path Captured]{e}".format(c=ANSI_colors["WHITE"],e=ANSI_colors["END"]),flush=True) used_slots.append(slot[0]) path = slot[0] done=True else: print(" {c}[Duplicate Device Path Found - Ignoring]{e}".format(c=ANSI_colors["RED"],e=ANSI_colors["END"]),flush=True) else: print("Comparison failed, try again in 5 seconds.") return path def print_alias_dict(alias_dict): length = 15 print("{k:{c}^{l}}+{v:{c}^{p}}".format(k="-",c="-",l=length+1,v="-",p=80-(length+2)),flush=True) print("{k:{c}^{l}} | {v}".format(k="ALIAS",c=" ",l=length,v="PATH"),flush=True) print("{k:{c}^{l}}+{v:{c}^{p}}".format(k="-",c="-",l=length+1,v="-",p=80-(length+2)),flush=True) for alias in alias_dict.keys(): print("{k:{c}^{l}} | {v}".format(k=alias,c=" ",l=length,v=alias_dict[alias]),flush=True) def detect_drives(aliases,initial_devices): alias_dict = {} used_slots = [] directory = "/dev/disk/by-path/" devices = initial_devices.copy() print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Drive Detection Initiated {e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) for alias in aliases: alias_dict[alias] = directory + get_alias_path(alias,used_slots) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Drive Detection Completed{e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) return alias_dict def get_confirmation(alias_dict): vdev_id_filename = "vdev_id.conf" default_path = "/etc/" + vdev_id_filename # remove existing vdev_id.conf file if os.path.exists(default_path) and os.path.isfile(default_path): os.remove(default_path) # print summary print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Drive Detection Summary{e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print_alias_dict(alias_dict) # perform confirmation print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Confirmation {e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) confirmation = input("\nWould you like to create/replace {w}{f}{e} and trigger udev rules? (y/n): ".format(f=default_path,w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) if confirmation not in ["y","Y"]: sys.exit(1) # generate vdev_id.conf content file_content = "# This file was generated using dalias ({v})\n".format(v=dalias_version) for alias in alias_dict.keys(): file_content += "alias {a} {v}\n".format(a=alias,v=alias_dict[alias]) # Download udev rules print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Gathering udev rules {e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) if not verify_vdev(): print("Unable to obtain required udev rules files") sys.exit(1) # print off vdev_id.conf file content print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}{f}{e}".format(f=default_path,w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(file_content) # write file to disk f = open(default_path,"w") f.write(file_content) f.close() #trigger udev rules print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print(" {w}Triggering udev rules {e}".format(w=ANSI_colors["WHITE"],e=ANSI_colors["END"])) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) reload_udev() trigger_udev() def reload_udev(): reload_successful = False print("Reloading udev rules") try: reload_call = subprocess.run(["udevadm","control","--reload-rules"]) except OSError: print("Error reloading udev rules (udevadm control --reload-rules)") sys.exit(1) def trigger_udev(): trigger_successful = False print("Triggering udev rules") try: trigger_call = subprocess.run(["udevadm","trigger"]) if trigger_call.returncode == 0: trigger_successful = True except OSError: print("Error triggering udevadm (udevadm trigger)") sys.exit(1) if trigger_successful: try: settle_call = subprocess.run(["udevadm","settle"]) if settle_call.returncode != 0: raise OSError() except OSError: print("Error settling udevadm (udevadm settle)") sys.exit(1) def verify_vdev(): udev_dir="/usr/lib/udev" rules_path="/usr/lib/udev/rules.d/68-vdev.rules" script_path="/usr/lib/udev/vdev_id_45drives" rules_copy_path="/opt/45drives/tools/68-vdev.rules" script_copy_path="/opt/45drives/tools/vdev_id_45drives" rules_copy_test = os.path.exists(rules_copy_path) script_copy_test = os.path.exists(script_copy_path) # Download the required scripts if they are not present in /opt/45drives/tools if not rules_copy_test: print("cannot find " + rules_copy_path) print("Attempting to download required file: 68-vdev.rules") rules_repo="https://scripts.45drives.com/udev/68-vdev.rules" rv=subprocess.run(["curl","-o",rules_copy_path,rules_repo],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL).returncode if rv: print("error downloading 68-vdev.rules from " + rules_repo) else: rules_copy_test = os.path.exists(rules_copy_path) if not script_copy_test: print("cannot find " + script_copy_path) print("Attempting to download required file: vdev_id_45drives") script_repo="https://scripts.45drives.com/udev/vdev_id_45drives" rv=subprocess.run(["curl","-o",script_copy_path,script_repo],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL).returncode if rv: print("error downloading vdev_id_45drives from " + script_repo) else: script_copy_test = os.path.exists(script_copy_path) # check for location of udev rules folder. if not os.path.exists(udev_dir): print("can't find " + udev_dir) udev_dir = "/lib/udev" print("trying " + udev_dir + " instead.") if os.path.exists(udev_dir): rules_path="/lib/udev/rules.d/68-vdev.rules" script_path="/lib/udev/vdev_id_45drives" else: print("unable to locate proper udev rules folder") sys.exit(1) # Copy the scripts from /opt/45drives/tools to their proper locations rv=subprocess.run(["cp","-f",rules_copy_path,rules_path],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL).returncode if rv: print("error replacing " + rules_path) rv=subprocess.run(["cp","-f",script_copy_path,script_path],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL).returncode if rv: print("error replacing " + script_path) rules_test = os.path.exists(rules_path) script_test = os.path.exists(script_path) script_x_test = os.access(script_path,os.X_OK) if script_test else False # make the script executable if it is not already executable. if not script_x_test: if script_test: rv=subprocess.run(["chmod","+x",script_path],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL).returncode if rv: print("error making " + script_path + " executable") else: script_x_test = os.access(script_path,os.X_OK) else: print("cannot locate " + script_path) return rules_test and script_test and script_x_test def print_welcome(): print("{s:{c}^{l}}".format(s="-",l=80,c="-")) print("{r} /$$ /$$ /$$$$$$$ {e}{w} /$$$$$$$ /$$ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r}| $$ | $$| $$____/ {e}{w}| $$__ $$ |__/ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r}| $$ | $$| $$ {e}{w}| $$ \ $$ /$$$$$$ /$$ /$$ /$$ /$$$$$$ /$$$$$$${e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r}| $$$$$$$$| $$$$$$$ {e}{w}| $$ | $$ /$$__ $$| $$| $$ /$$//$$__ $$ /$$_____/{e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r}|_____ $$|_____ $${e}{w}| $$ | $$| $$ \__/| $$ \ $$/$$/| $$$$$$$$| $$$$$$ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r} | $$ /$$ \ $${e}{w}| $$ | $$| $$ | $$ \ $$$/ | $$_____/ \____ $${e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r} | $$| $$$$$$/{e}{w}| $$$$$$$/| $$ | $$ \ $/ | $$$$$$$ /$$$$$$$/{e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print("{r} |__/ \______/ {e}{w}|_______/ |__/ |__/ \_/ \_______/|_______/ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" ") print(" ") print(" {r} /$${e}{w} /$$ /$$ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r} | $${e}{w} | $$|__/ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r} /$$$$$$${e}{w} /$$$$$$ | $$ /$$ /$$$$$$ /$$$$$$${e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r} /$$__ $${e}{w} |____ $$| $$| $$ |____ $$ /$$_____/{e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r}| $$ | $${e}{w} /$$$$$$$| $$| $$ /$$$$$$$| $$$$$$ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r}| $$ | $${e}{w} /$$__ $$| $$| $$ /$$__ $$ \____ $${e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r}| $$$$$$${e}{w}| $$$$$$$| $$| $$| $$$$$$$ /$$$$$$$/{e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" {r} \_______/{e}{w} \_______/|__/|__/ \_______/|_______/ {e}".format(w=ANSI_colors["WHITE"],r=ANSI_colors["RED"],e=ANSI_colors["END"])) print(" v{v}".format(v=dalias_version)) print("{s:{c}^{l}}".format(s="-",l=80,c="-")) def apply_template(template_path, config_content, config_path, template_name): print("Updating {c} with {t} template\n".format(c=config_path,t=template_name)) if not os.path.exists(template_path) or not os.path.isfile(template_path): print("invalid config template_path provided {p}".format(p=template_path)) sys.exit(1) file = open(template_path) file_content = file.read().splitlines() file_content = [ line + "\n" for line in file_content] config_content.extend(file_content) with open(config_path,"w") as config_file: for line in config_content: print(line,end="") config_file.write(line) def main(): # ensure that script has been run with root privilages check_root() dalias_config_path = "/opt/45drives/dalias/dalias.conf" template_dir = "/opt/45drives/dalias/example_config" template_choices = sorted(os.listdir(template_dir)) config_content = [ "# -----------------------------------------------------------------------------\n" "# /opt/45drives/dalias/dalias.conf\n" "# -----------------------------------------------------------------------------\n" "# This file is used by dalias (device aliasing program).\n" "# Useage: Provide a list of device aliases in this file. \n" "# Each entry must be on a line of its own. \n" "# Whitespace will be automatically trimmed from each alias name. \n" ] parser = OptionParser() parser.add_option("-t","--template", type="choice", choices = template_choices, action="store", dest="template", default=None, help="Use an alaising scheme from an existing 45Drives server template. Valid Choices are: {tc}".format(tc=template_choices)) (options, args) = parser.parse_args() print_welcome() if options.template != None: if options.template in template_choices: print("Updating {c} with {t} template".format(c=dalias_config_path,t=options.template)) apply_template("{td}/{t}".format(td=template_dir,t=options.template),config_content,dalias_config_path,options.template) aliases = load_config(dalias_config_path) if not aliases: print("No aliases found in {d}".format(d=dalias_config_path)) selection = input("Would you like to use an aliasing scheme from an existing template? (y,n): ") if selection not in ["y","Y"]: print("exiting..") sys.exit(1) else: print("Choose a template to apply: ") counter = 0 for template in template_choices: counter = counter+1 print(" {c}. {tn}".format(c=counter,tn=template)) template_index = input("\nSelection (1 - {c}): ".format(c=counter)) if template_index.isnumeric() and int(template_index)-1 in range(0,len(template_choices)): apply_template("{td}/{t}".format(td=template_dir,t=template_choices[int(template_index)-1]),config_content,dalias_config_path,template_choices[int(template_index)-1]) aliases = load_config(dalias_config_path) confirm_aliases(aliases) devices = get_devices() alias_dict = detect_drives(aliases,devices) get_confirmation(alias_dict) if __name__ == "__main__": main()