Heray-Was-Here
Server : Apache
System : Linux vps37394.inmotionhosting.com 3.10.0-1160.119.1.vz7.224.4 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : jasonp18 ( 1000)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /opt/dedrads/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/dedrads/innodb_converter.py
#!/usr/lib/rads/venv/bin/python3
"""Converts all MyISAM tables to InnoDB"""
# import list type
from pathlib import Path
import os
import sys
from argparse import ArgumentParser
import logging
import pymysql

# check for root
if os.getuid() != 0:
    sys.exit('This script must be run as root')

logger = logging.getLogger('innodb_convert.py')
# log to /var/log/innodb_convert.log
logger.setLevel(logging.DEBUG)
# print info and above to log file and console
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
file_handler = logging.FileHandler('/var/log/innodb_convert.log')
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)


def confirm_backups(check: bool, default='N'):
    """Confirms that backups have been taken, with configurable default (Y or N)"""
    if check:
        return
    prompt = f"Have you taken backups? (y/{default}) "
    answer = input(prompt).strip() or default
    if answer.lower() != 'y':
        prompt2 = f"Do you wish to continue anyway? (y/{default}) "
        answer2 = input(prompt2).strip() or default
        if answer2.lower() != 'y':
            sys.exit('Exiting...')


def get_db_config() -> dict:
    """Returns a dictionary of the MySQL configuration"""
    mysql_cnf = Path('/root/.my.cnf')
    if not mysql_cnf.exists():
        logger.error('Error: /root/.my.cnf does not exist')
        sys.exit(1)

    my_cnf_data = {}

    with open(mysql_cnf, encoding='utf-8') as file:
        for line in file:
            if line.startswith('['):
                section = line.strip('[]\n')
                my_cnf_data[section] = {}
            else:
                key, value = line.strip().split('=')
                value = value.replace("'", "").replace('"', '')
                my_cnf_data[section][key] = value

    db_config_dict = {
        'host': 'localhost',
        'user': my_cnf_data['client']['user'],
        'password': my_cnf_data['client']['password'],
        'charset': 'utf8mb4',
        'cursorclass': pymysql.cursors.DictCursor,
    }
    return db_config_dict


db_config = get_db_config()


def list_databases() -> list:
    """Returns a list of all databases"""
    databases = []
    try:
        with pymysql.connect(**db_config) as connection:
            with connection.cursor() as cursor:
                cursor.execute('SHOW DATABASES')
                databases = cursor.fetchall()
                databases = [x['Database'] for x in databases]
                return databases
    except pymysql.err.MySQLError as e:
        error = f'Error: {e}'
        sys.exit(error)


def list_tables(database: str) -> tuple[str | None, list[str] | None]:
    """Returns a list of all tables in a database"""
    tables = []
    try:
        with pymysql.connect(database=database, **db_config) as connection:
            with connection.cursor() as cursor:
                cursor.execute('SHOW TABLES')
                tables = cursor.fetchall()
                tables = [x['Tables_in_' + database] for x in tables]
                return None, tables
    except pymysql.err.MySQLError as e:
        error = f'Error on {database}: {e}'
        logger.error(error)
        return error, None


def check_engine(database: str, table: str) -> tuple[str | None, str | None]:
    """Returns the engine of a table"""
    try:
        with pymysql.connect(database=database, **db_config) as connection:
            with connection.cursor() as cursor:
                error = None
                cursor.execute('SHOW TABLE STATUS LIKE %s', (table,))
                table_status = cursor.fetchone()
                return error, table_status['Engine']
    except pymysql.err.MySQLError as e:
        error = f'Error on {database}.{table}: {e}'
        logger.error(error)
        return error, None


def convert_table(database: str, table: str, target) -> str | None:
    """Converts a table to InnoDB to the target engine(InnoDB or MyISAM)"""
    try:
        with pymysql.connect(database=database, **db_config) as connection:
            with connection.cursor() as cursor:
                cursor.execute(f'ALTER TABLE {table.replace("`", "``")} ENGINE={target}')
                return None
    except pymysql.err.MySQLError as e:
        error = f'Error: {e}'
        logger.error(error)
        return error


def get_mysql_innodb_settings() -> tuple[list, dict]:
    """Returns the innodb settings from my.cnf and my.cnf.d"""
    conf_dir = Path('/etc/my.cnf.d')
    conf_files = list(conf_dir.glob('*.cnf'))
    main_conf = Path('/etc/my.cnf')
    innodb_settings = {}
    key_errors = []
    conf_files.append(main_conf)
    for file in conf_files:
        innodb_settings[str(file.name)] = {}
        if file.exists():
            with file.open('r', encoding='utf-8') as f:
                for line in f:
                    if line.startswith('innodb_'):
                        try:
                            key, value = line.strip().split('=')
                            key = key.strip()
                        except ValueError:
                            key = line.strip()
                            value = ''
                        for _, settings in innodb_settings.items():
                            if key in settings:
                                error = (
                                    f'Error: {key} is defined in multiple files'
                                )
                                key_errors.append(error)
                        innodb_settings[str(file.name)][key] = value
    return key_errors, innodb_settings


def clean_db_list(databases: list) -> list:
    """Removes system databases from the list"""
    skips = ['information_schema', 'performance_schema', 'mysql', 'sys', 'test']
    for database in skips:
        if database in databases:
            databases.remove(database)
    return databases


def process_databases(databases: list, args) -> list:
    """Processes all databases"""
    errors = []
    if args.myisam:
        current='InnoDB'
        target='MyISAM'
    else:
        current='MyISAM'
        target='InnoDB'
    for database in databases:
        error, tables = list_tables(database)
        if args.table:
            if args.table in tables:
                tables = [args.table]
            else:
                error = f'Error: {args.table} does not exist in {database}'
                logger.error(error)
                errors.append(error)
                continue
        if error:
            errors.append(error)
            continue
        for table in tables:
            error, engine = check_engine(database, table)
            if error:
                errors.append(error)
                continue
            if args.check:
                logger.info('%s.%s is %s', database, table, engine)
                continue
            if engine.lower() == current.lower():
                logger.info('Converting %s.%s to %s', database, table, target)
                error = convert_table(database, table, target)
                if error:
                    errors.append(error)
                    continue
    return errors


def main():
    """Main function"""
    parser = ArgumentParser(description='Converts MyISAM tables to InnoDB')
    parser.add_argument('-a', '--all', help='Select all databases', action='store_true')
    parser.add_argument('-d', '--database', help='Database to convert')
    parser.add_argument('-t', '--table', help='Specify a table to convert. combine with -d')
    parser.add_argument('--check', help='Check selected databases', action='store_true')
    parser.add_argument('--myisam', help='Convert to MyISAM instead of InnoDB', action='store_true')
    args = parser.parse_args()
    if args.all and args.database:
        logger.error('Error: --all and --database are mutually exclusive')
        parser.print_help()
        sys.exit(1)
    elif not args.all and not args.database:
        logger.error('Error: No database specified')
        logger.error('must use either --all (-a) or --database (-d) flags')
        parser.print_help()
        sys.exit(1)
    elif args.table and not args.database:
        parser.error('--table requires --database')

    errors = []
    confirm_backups(args.check)
    databases = list_databases()
    databases = clean_db_list(databases)
    if args.database:
        database = args.database
        if database not in databases:
            error = f'Error: {database} does not exist or is a system database'
            logger.error(error)
            sys.exit(error)
        databases = [database]
    errors = process_databases(databases, args)
    key_errors, innodb_settings = get_mysql_innodb_settings()
    if len(errors) > 0 or len(key_errors) > 0:
        logger.error('Errors occurred:')
        for error in errors:
            logger.error("\t%s", error)
        for key_error in key_errors:
            logger.error("\t%s", key_error)
    logger.info('InnoDB settings:')
    for filename, settings in innodb_settings.items():
        logger.info('%s', filename)
        for key, value in settings.items():
            if value == '':
                logger.info('\t%s', key)
            else:
                logger.info('\t%s = %s', key, value)


if __name__ == '__main__':
    main()

Hry