Source code for sqlite3_to_mysql.transporter

"""Use to transfer an SQLite 3 database to MySQL."""

import logging
import os
import re
import sqlite3
import typing as t
from datetime import timedelta
from decimal import Decimal
from itertools import chain
from math import ceil
from os.path import isfile, realpath
from sys import stdout

import mysql.connector
from mysql.connector import CharacterSet
from mysql.connector import __version__ as mysql_connector_version_string
from mysql.connector import errorcode
from packaging import version
from tqdm import tqdm, trange


try:
    # Python 3.11+
    from typing import Unpack  # type: ignore[attr-defined]
except ImportError:
    # Python < 3.11
    from typing_extensions import Unpack  # type: ignore

from sqlite3_to_mysql.sqlite_utils import (
    adapt_decimal,
    adapt_timedelta,
    check_sqlite_table_xinfo_support,
    convert_date,
    convert_decimal,
    convert_timedelta,
    unicase_compare,
)

from .mysql_utils import (
    MYSQL_BLOB_COLUMN_TYPES,
    MYSQL_COLUMN_TYPES,
    MYSQL_COLUMN_TYPES_WITHOUT_DEFAULT,
    MYSQL_INSERT_METHOD,
    MYSQL_TEXT_COLUMN_TYPES,
    MYSQL_TEXT_COLUMN_TYPES_WITH_JSON,
    check_mysql_fulltext_support,
    check_mysql_json_support,
    check_mysql_values_alias_support,
    safe_identifier_length,
)
from .types import SQLite3toMySQLAttributes, SQLite3toMySQLParams


[docs] class SQLite3toMySQL(SQLite3toMySQLAttributes): """Use this class to transfer an SQLite 3 database to MySQL.""" COLUMN_PATTERN: t.Pattern[str] = re.compile(r"^[^(]+") COLUMN_LENGTH_PATTERN: t.Pattern[str] = re.compile(r"\(\d+\)") COLUMN_PRECISION_AND_SCALE_PATTERN: t.Pattern[str] = re.compile(r"\(\d+,\d+\)") COLUMN_UNSIGNED_PATTERN: t.Pattern[str] = re.compile(r"\bUNSIGNED\b", re.IGNORECASE) MYSQL_CONNECTOR_VERSION: version.Version = version.parse(mysql_connector_version_string)
[docs] def __init__(self, **kwargs: Unpack[SQLite3toMySQLParams]): """Constructor.""" if kwargs.get("sqlite_file") is None: raise ValueError("Please provide an SQLite file") elif not isfile(str(kwargs.get("sqlite_file"))): raise FileNotFoundError("SQLite file does not exist") else: self._sqlite_file = realpath(str(kwargs.get("sqlite_file"))) if kwargs.get("mysql_user") is not None: self._mysql_user = str(kwargs.get("mysql_user")) else: raise ValueError("Please provide a MySQL user") self._mysql_password = str(kwargs.get("mysql_password")) if kwargs.get("mysql_password") else None self._mysql_host = str(kwargs.get("mysql_host", "localhost")) self._mysql_port = kwargs.get("mysql_port", 3306) or 3306 if kwargs.get("mysql_socket") is not None: if not os.path.exists(str(kwargs.get("mysql_socket"))): raise FileNotFoundError("MySQL socket does not exist") else: self._mysql_socket = realpath(str(kwargs.get("mysql_socket"))) self._mysql_host = None self._mysql_port = None else: self._mysql_socket = None self._sqlite_tables = kwargs.get("sqlite_tables") or tuple() self._without_foreign_keys = bool(self._sqlite_tables) or bool(kwargs.get("without_foreign_keys", False)) self._mysql_ssl_disabled = bool(kwargs.get("mysql_ssl_disabled", False)) self._chunk_size = bool(kwargs.get("chunk")) self._quiet = bool(kwargs.get("quiet", False)) self._logger = self._setup_logger(log_file=kwargs.get("log_file", None), quiet=self._quiet) self._mysql_database = kwargs.get("mysql_database", "transfer") or "transfer" self._mysql_insert_method = str(kwargs.get("mysql_insert_method", "IGNORE")).upper() if self._mysql_insert_method not in MYSQL_INSERT_METHOD: self._mysql_insert_method = "IGNORE" self._mysql_truncate_tables = bool(kwargs.get("mysql_truncate_tables", False)) self._mysql_integer_type = str(kwargs.get("mysql_integer_type", "INT(11)")).upper() self._mysql_string_type = str(kwargs.get("mysql_string_type", "VARCHAR(255)")).upper() self._mysql_text_type = str(kwargs.get("mysql_text_type", "TEXT")).upper() if self._mysql_text_type not in MYSQL_TEXT_COLUMN_TYPES: self._mysql_text_type = "TEXT" self._mysql_charset = kwargs.get("mysql_charset", "utf8mb4") or "utf8mb4" self._mysql_collation = ( kwargs.get("mysql_collation") or CharacterSet().get_default_collation(self._mysql_charset.lower())[0] ) if not kwargs.get("mysql_collation") and self._mysql_collation == "utf8mb4_0900_ai_ci": self._mysql_collation = "utf8mb4_unicode_ci" self._ignore_duplicate_keys = kwargs.get("ignore_duplicate_keys", False) or False self._use_fulltext = kwargs.get("use_fulltext", False) or False self._with_rowid = kwargs.get("with_rowid", False) or False sqlite3.register_adapter(Decimal, adapt_decimal) sqlite3.register_converter("DECIMAL", convert_decimal) sqlite3.register_adapter(timedelta, adapt_timedelta) sqlite3.register_converter("DATE", convert_date) sqlite3.register_converter("TIME", convert_timedelta) self._sqlite = sqlite3.connect(realpath(self._sqlite_file), detect_types=sqlite3.PARSE_DECLTYPES) self._sqlite.row_factory = sqlite3.Row self._sqlite.create_collation("unicase", unicase_compare) self._sqlite_cur = self._sqlite.cursor() self._sqlite_version = self._get_sqlite_version() self._sqlite_table_xinfo_support = check_sqlite_table_xinfo_support(self._sqlite_version) self._mysql_create_tables = bool(kwargs.get("mysql_create_tables", True)) self._mysql_transfer_data = bool(kwargs.get("mysql_transfer_data", True)) if not self._mysql_transfer_data and not self._mysql_create_tables: raise ValueError("Unable to continue without transferring data or creating tables!") connection_args: t.Dict[str, t.Any] = { "user": self._mysql_user, "use_pure": True, "charset": self._mysql_charset, "collation": self._mysql_collation, } if self._mysql_password is not None: connection_args["password"] = self._mysql_password if self._mysql_socket is not None: connection_args["unix_socket"] = self._mysql_socket else: connection_args["host"] = self._mysql_host connection_args["port"] = self._mysql_port if self._mysql_ssl_disabled: connection_args["ssl_disabled"] = True try: _mysql_connection = mysql.connector.connect(**connection_args) if isinstance(_mysql_connection, mysql.connector.MySQLConnection): self._mysql = _mysql_connection else: raise ConnectionError("Unable to connect to MySQL") if not self._mysql.is_connected(): raise ConnectionError("Unable to connect to MySQL") self._mysql_cur = self._mysql.cursor(prepared=True) try: self._mysql.database = self._mysql_database except mysql.connector.Error as err: if err.errno == errorcode.ER_BAD_DB_ERROR: self._create_database() else: self._logger.error(err) raise self._mysql_version = self._get_mysql_version() self._mysql_json_support = check_mysql_json_support(self._mysql_version) self._mysql_fulltext_support = check_mysql_fulltext_support(self._mysql_version) if self._use_fulltext and not self._mysql_fulltext_support: raise ValueError("Your MySQL version does not support InnoDB FULLTEXT indexes!") except mysql.connector.Error as err: self._logger.error(err) raise
@classmethod def _setup_logger( cls, log_file: t.Optional[t.Union[str, "os.PathLike[t.Any]"]] = None, quiet: bool = False ) -> logging.Logger: formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger = logging.getLogger(cls.__name__) logger.setLevel(logging.DEBUG) if not quiet: screen_handler = logging.StreamHandler(stream=stdout) screen_handler.setFormatter(formatter) logger.addHandler(screen_handler) if log_file: file_handler = logging.FileHandler(realpath(log_file), mode="w") file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger def _get_mysql_version(self) -> str: try: self._mysql_cur.execute("SHOW VARIABLES LIKE 'version'") row = self._mysql_cur.fetchone() if row: return str(row[1]) else: self._logger.error("MySQL failed checking for InnoDB version") raise mysql.connector.Error("MySQL failed checking for InnoDB version") except (IndexError, mysql.connector.Error) as err: self._logger.error( "MySQL failed checking for InnoDB version: %s", err, ) raise def _get_sqlite_version(self) -> str: try: self._sqlite_cur.execute("SELECT sqlite_version()") return str(self._sqlite_cur.fetchone()[0]) except (IndexError, sqlite3.Error) as err: self._logger.error( "SQLite failed checking for InnoDB version: %s", err, ) raise def _sqlite_table_has_rowid(self, table: str) -> bool: try: self._sqlite_cur.execute(f'SELECT rowid FROM "{table}" LIMIT 1') self._sqlite_cur.fetchall() return True except sqlite3.OperationalError: return False def _create_database(self) -> None: try: self._mysql_cur.execute( f""" CREATE DATABASE IF NOT EXISTS `{self._mysql_database}` DEFAULT CHARACTER SET {self._mysql_charset} DEFAULT COLLATE {self._mysql_collation} """ ) self._mysql_cur.close() self._mysql.commit() self._mysql.database = self._mysql_database self._mysql_cur = self._mysql.cursor(prepared=True) # pylint: disable=W0201 except mysql.connector.Error as err: self._logger.error( "MySQL failed creating databse %s: %s", self._mysql_database, err, ) raise @classmethod def _valid_column_type(cls, column_type: str) -> t.Optional[t.Match[str]]: return cls.COLUMN_PATTERN.match(column_type.strip()) def _translate_type_from_sqlite_to_mysql(self, column_type: str) -> str: """This could be optimized even further, however is seems adequate.""" full_column_type: str = column_type.upper() unsigned: bool = self.COLUMN_UNSIGNED_PATTERN.search(full_column_type) is not None match: t.Optional[t.Match[str]] = self._valid_column_type(column_type) if not match: raise ValueError(f'"{column_type}" is not a valid column_type!') data_type: str = match.group(0).upper() if data_type in {"TEXT", "CLOB", "STRING"}: return self._mysql_text_type if data_type in {"CHARACTER", "NCHAR", "NATIVE CHARACTER"}: return "CHAR" + self._column_type_length(column_type) if data_type in {"VARYING CHARACTER", "NVARCHAR", "VARCHAR"}: if self._mysql_string_type in MYSQL_TEXT_COLUMN_TYPES: return self._mysql_string_type length = self._column_type_length(column_type) if not length: return self._mysql_string_type match = self._valid_column_type(self._mysql_string_type) if match: return match.group(0).upper() + length if data_type == "UNSIGNED BIG INT": return f"BIGINT{self._column_type_length(column_type)} UNSIGNED" if data_type.startswith(("TINYINT", "INT1")): return f"TINYINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("SMALLINT", "INT2")): return f"SMALLINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("MEDIUMINT", "INT3")): return f"MEDIUMINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith("INT4"): return f"INT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("BIGINT", "INT8")): return f"BIGINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("INT64", "NUMERIC")): if data_type == "NUMERIC" and self._column_type_precision_and_scale(full_column_type) != "": return f"DECIMAL{self._column_type_precision_and_scale(column_type)}{' UNSIGNED' if unsigned else ''}" return f"BIGINT{self._column_type_length(column_type, 19)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("INTEGER", "INT")): length = self._column_type_length(column_type) if not length: if "UNSIGNED" in self._mysql_integer_type: return self._mysql_integer_type return f"{self._mysql_integer_type}{' UNSIGNED' if unsigned else ''}" match = self._valid_column_type(self._mysql_integer_type) if match: if "UNSIGNED" in self._mysql_integer_type: return f"{match.group(0).upper()}{length} UNSIGNED" return f"{match.group(0).upper()}{length}{' UNSIGNED' if unsigned else ''}" if data_type in {"BOOL", "BOOLEAN"}: return "TINYINT(1)" if data_type.startswith(("REAL", "DOUBLE", "FLOAT", "DECIMAL", "DEC", "FIXED")): return full_column_type if data_type not in MYSQL_COLUMN_TYPES: return self._mysql_string_type return full_column_type @classmethod def _column_type_length(cls, column_type: str, default: t.Optional[t.Union[str, int, float]] = None) -> str: suffix: t.Optional[t.Match[str]] = cls.COLUMN_LENGTH_PATTERN.search(column_type) if suffix: return suffix.group(0) if default: return f"({default})" return "" @classmethod def _column_type_precision_and_scale(cls, column_type: str) -> str: suffix: t.Optional[t.Match[str]] = cls.COLUMN_PRECISION_AND_SCALE_PATTERN.search(column_type) if suffix: return suffix.group(0) return "" def _create_table(self, table_name: str, transfer_rowid: bool = False) -> None: primary_keys: t.List[t.Dict[str, str]] = [] sql: str = f"CREATE TABLE IF NOT EXISTS `{safe_identifier_length(table_name)}` ( " if transfer_rowid: sql += " `rowid` BIGINT NOT NULL, " if self._sqlite_table_xinfo_support: self._sqlite_cur.execute(f'PRAGMA table_xinfo("{table_name}")') else: self._sqlite_cur.execute(f'PRAGMA table_info("{table_name}")') rows: t.List[t.Any] = self._sqlite_cur.fetchall() compound_primary_key: bool = len(tuple(True for row in rows if dict(row)["pk"] > 0)) > 1 for row in rows: column: t.Dict[str, t.Any] = dict(row) mysql_safe_name: str = safe_identifier_length(column["name"]) column_type: str = self._translate_type_from_sqlite_to_mysql(column["type"]) # The "hidden" value is 0 for visible columns, 1 for "hidden" columns, # 2 for computed virtual columns and 3 for computed stored columns. # Read more on hidden columns here https://www.sqlite.org/pragma.html#pragma_table_xinfo if "hidden" in column and column["hidden"] == 1: continue auto_increment: bool = ( column["pk"] > 0 and column_type.startswith(("INT", "BIGINT")) and not compound_primary_key ) sql += " `{name}` {type} {notnull} {default} {auto_increment}, ".format( name=mysql_safe_name, type=column_type, notnull="NOT NULL" if column["notnull"] or column["pk"] else "NULL", auto_increment="AUTO_INCREMENT" if auto_increment else "", default=( "DEFAULT " + column["dflt_value"] if column["dflt_value"] and column_type not in MYSQL_COLUMN_TYPES_WITHOUT_DEFAULT and not auto_increment else "" ), ) if column["pk"] > 0: primary_key: t.Dict[str, str] = { "column": mysql_safe_name, "length": "", } # In case we have a non-numeric primary key if column_type in ( MYSQL_TEXT_COLUMN_TYPES_WITH_JSON + MYSQL_BLOB_COLUMN_TYPES ) or column_type.startswith(("CHAR", "VARCHAR")): primary_key["length"] = self._column_type_length(column_type, 255) primary_keys.append(primary_key) sql = sql.rstrip(", ") if len(primary_keys) > 0: sql += ", PRIMARY KEY ({columns})".format( columns=", ".join("`{column}`{length}".format(**primary_key) for primary_key in primary_keys) ) if transfer_rowid: sql += f", CONSTRAINT `{safe_identifier_length(table_name)}_rowid` UNIQUE (`rowid`)" sql += f" ) ENGINE=InnoDB DEFAULT CHARSET={self._mysql_charset} COLLATE={self._mysql_collation}" try: self._mysql_cur.execute(sql) self._mysql.commit() except mysql.connector.Error as err: self._logger.error( "MySQL failed creating table %s: %s", safe_identifier_length(table_name), err, ) raise def _truncate_table(self, table_name: str) -> None: self._mysql_cur.execute( """ SELECT `TABLE_NAME` FROM `INFORMATION_SCHEMA`.`TABLES` WHERE `TABLE_SCHEMA` = %s AND `TABLE_NAME` = %s LIMIT 1 """, (self._mysql_database, safe_identifier_length(table_name)), ) if len(self._mysql_cur.fetchall()) > 0: self._logger.info("Truncating table %s", safe_identifier_length(table_name)) self._mysql_cur.execute(f"TRUNCATE TABLE `{safe_identifier_length(table_name)}`") def _add_indices(self, table_name: str) -> None: self._sqlite_cur.execute(f'PRAGMA table_info("{table_name}")') table_columns: t.Dict[str, str] = {} for row in self._sqlite_cur.fetchall(): column: t.Dict[str, t.Any] = dict(row) table_columns[column["name"]] = column["type"] self._sqlite_cur.execute(f'PRAGMA index_list("{table_name}")') indices: t.Tuple[t.Dict[str, t.Any], ...] = tuple(dict(row) for row in self._sqlite_cur.fetchall()) for index in indices: if index["origin"] == "pk": continue self._sqlite_cur.execute(f'PRAGMA index_info("{index["name"]}")') index_infos: t.Tuple[t.Dict[str, t.Any], ...] = tuple(dict(row) for row in self._sqlite_cur.fetchall()) index_type: str = "UNIQUE" if int(index["unique"]) == 1 else "INDEX" if any( table_columns[index_info["name"]].upper() in MYSQL_TEXT_COLUMN_TYPES_WITH_JSON for index_info in index_infos ): if self._use_fulltext and self._mysql_fulltext_support: # Use fulltext if requested and available index_type = "FULLTEXT" index_columns: str = ",".join( f'`{safe_identifier_length(index_info["name"])}`' for index_info in index_infos ) else: # Limit the max TEXT field index length to 255 index_columns = ", ".join( "`{column}`{length}".format( column=safe_identifier_length(index_info["name"]), length=( "(255)" if table_columns[index_info["name"]].upper() in MYSQL_TEXT_COLUMN_TYPES_WITH_JSON else "" ), ) for index_info in index_infos ) else: column_list: t.List[str] = [] for index_info in index_infos: index_length: str = "" # Limit the max BLOB field index length to 255 if table_columns[index_info["name"]].upper() in MYSQL_BLOB_COLUMN_TYPES: index_length = "(255)" else: suffix: t.Optional[t.Match[str]] = self.COLUMN_LENGTH_PATTERN.search( table_columns[index_info["name"]] ) if suffix: index_length = suffix.group(0) column_list.append(f'`{safe_identifier_length(index_info["name"])}`{index_length}') index_columns = ", ".join(column_list) try: self._add_index( table_name=table_name, index_type=index_type, index=index, index_columns=index_columns, index_infos=index_infos, ) except mysql.connector.Error as err: if err.errno == errorcode.ER_BAD_FT_COLUMN and index_type == "FULLTEXT": # handle bad FULLTEXT index self._add_index( table_name=table_name, index_type="UNIQUE" if int(index["unique"]) == 1 else "INDEX", index=index, index_columns=", ".join( "`{column}`{length}".format( column=safe_identifier_length(index_info["name"]), length=( "(255)" if table_columns[index_info["name"]].upper() in MYSQL_TEXT_COLUMN_TYPES_WITH_JSON else "" ), ) for index_info in index_infos ), index_infos=index_infos, ) else: raise def _add_index( self, table_name: str, index_type: str, index: t.Dict[str, t.Any], index_columns: str, index_infos: t.Tuple[t.Dict[str, t.Any], ...], index_iteration: int = 0, ) -> None: sql: str = ( """ ALTER TABLE `{table}` ADD {index_type} `{name}`({columns}) """.format( table=safe_identifier_length(table_name), index_type=index_type, name=( safe_identifier_length(index["name"]) if index_iteration == 0 else f'{safe_identifier_length(index["name"], max_length=60)}_{index_iteration}' ), columns=index_columns, ) ) try: self._logger.info( """Adding %s to column "%s" in table %s""", "unique index" if int(index["unique"]) == 1 else "index", ", ".join(safe_identifier_length(index_info["name"]) for index_info in index_infos), safe_identifier_length(table_name), ) self._mysql_cur.execute(sql) self._mysql.commit() except mysql.connector.Error as err: if err.errno == errorcode.ER_DUP_KEYNAME: if not self._ignore_duplicate_keys: # handle a duplicate key name self._add_index( table_name=table_name, index_type=index_type, index=index, index_columns=index_columns, index_infos=index_infos, index_iteration=index_iteration + 1, ) self._logger.warning( """Duplicate key "%s" in table %s detected! Trying to create new key "%s_%s" ...""", safe_identifier_length(index["name"]), safe_identifier_length(table_name), safe_identifier_length(index["name"]), index_iteration + 1, ) else: self._logger.warning( """Ignoring duplicate key "%s" in table %s!""", safe_identifier_length(index["name"]), safe_identifier_length(table_name), ) elif err.errno == errorcode.ER_BAD_FT_COLUMN: # handle bad FULLTEXT index self._logger.warning( """Failed adding FULLTEXT index to column "%s" in table %s. Retrying without FULLTEXT ...""", ", ".join(safe_identifier_length(index_info["name"]) for index_info in index_infos), safe_identifier_length(table_name), ) raise else: self._logger.error( """MySQL failed adding index to column "%s" in table %s: %s""", ", ".join(safe_identifier_length(index_info["name"]) for index_info in index_infos), safe_identifier_length(table_name), err, ) raise def _add_foreign_keys(self, table_name: str) -> None: self._sqlite_cur.execute(f'PRAGMA foreign_key_list("{table_name}")') for row in self._sqlite_cur.fetchall(): foreign_key: t.Dict[str, t.Any] = dict(row) sql = """ ALTER TABLE `{table}` ADD CONSTRAINT `{table}_FK_{id}_{seq}` FOREIGN KEY (`{column}`) REFERENCES `{ref_table}`(`{ref_column}`) ON DELETE {on_delete} ON UPDATE {on_update} """.format( id=foreign_key["id"], seq=foreign_key["seq"], table=safe_identifier_length(table_name), column=safe_identifier_length(foreign_key["from"]), ref_table=safe_identifier_length(foreign_key["table"]), ref_column=safe_identifier_length(foreign_key["to"]), on_delete=( foreign_key["on_delete"].upper() if foreign_key["on_delete"].upper() != "SET DEFAULT" else "NO ACTION" ), on_update=( foreign_key["on_update"].upper() if foreign_key["on_update"].upper() != "SET DEFAULT" else "NO ACTION" ), ) try: self._logger.info( "Adding foreign key to %s.%s referencing %s.%s", safe_identifier_length(table_name), safe_identifier_length(foreign_key["from"]), safe_identifier_length(foreign_key["table"]), safe_identifier_length(foreign_key["to"]), ) self._mysql_cur.execute(sql) self._mysql.commit() except mysql.connector.Error as err: self._logger.error( "MySQL failed adding foreign key to %s.%s referencing %s.%s: %s", safe_identifier_length(table_name), safe_identifier_length(foreign_key["from"]), safe_identifier_length(foreign_key["table"]), safe_identifier_length(foreign_key["to"]), err, ) raise def _transfer_table_data(self, sql: str, total_records: int = 0) -> None: if self._chunk_size is not None and self._chunk_size > 0: for _ in trange(0, int(ceil(total_records / self._chunk_size)), disable=self._quiet): self._mysql_cur.executemany( sql, (tuple(row) for row in self._sqlite_cur.fetchmany(self._chunk_size)), # type: ignore ) else: self._mysql_cur.executemany( sql, ( # type: ignore tuple(row) for row in tqdm( self._sqlite_cur.fetchall(), total=total_records, disable=self._quiet, ) ), ) self._mysql.commit()
[docs] def transfer(self) -> None: """The primary and only method with which we transfer all the data.""" if len(self._sqlite_tables) > 0: # transfer only specific tables self._sqlite_cur.execute( f""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' AND name IN({("?, " * len(self._sqlite_tables)).rstrip(" ,")}) """, self._sqlite_tables, ) else: # transfer all tables self._sqlite_cur.execute( """ SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' """ ) try: self._mysql_cur.execute("SET FOREIGN_KEY_CHECKS=0") for row in self._sqlite_cur.fetchall(): table: t.Dict[str, t.Any] = dict(row) # check if we're transferring rowid transfer_rowid: bool = self._with_rowid and self._sqlite_table_has_rowid(table["name"]) # create the table if self._mysql_create_tables: self._create_table(table["name"], transfer_rowid=transfer_rowid) # truncate the table on request if self._mysql_truncate_tables: self._truncate_table(table["name"]) # get the size of the data if self._mysql_transfer_data: self._sqlite_cur.execute(f'SELECT COUNT(*) AS total_records FROM "{table["name"]}"') total_records = int(dict(self._sqlite_cur.fetchone())["total_records"]) else: total_records = 0 # only continue if there is anything to transfer if total_records > 0: # populate it self._logger.info("Transferring table %s", table["name"]) self._sqlite_cur.execute( '''SELECT {rowid} * FROM "{table_name}"'''.format( rowid='rowid as "rowid",' if transfer_rowid else "", table_name=table["name"], ) ) columns: t.List[str] = [ safe_identifier_length(column[0]) for column in self._sqlite_cur.description ] sql: str if self._mysql_insert_method.upper() == "UPDATE": sql = """ INSERT INTO `{table}` ({fields}) {values_clause} ON DUPLICATE KEY UPDATE {field_updates} """.format( table=safe_identifier_length(table["name"]), fields=("`{}`, " * len(columns)).rstrip(" ,").format(*columns), values_clause=( "VALUES ({placeholders}) AS `__new__`" if check_mysql_values_alias_support(self._mysql_version) else "VALUES ({placeholders})" ).format(placeholders=("%s, " * len(columns)).rstrip(" ,")), field_updates=( ("`{}`=`__new__`.`{}`, " * len(columns)).rstrip(" ,") if check_mysql_values_alias_support(self._mysql_version) else ("`{}`=`{}`, " * len(columns)).rstrip(" ,") ).format(*list(chain.from_iterable((column, column) for column in columns))), ) else: sql = """ INSERT {ignore} INTO `{table}` ({fields}) VALUES ({placeholders}) """.format( ignore="IGNORE" if self._mysql_insert_method.upper() == "IGNORE" else "", table=safe_identifier_length(table["name"]), fields=("`{}`, " * len(columns)).rstrip(" ,").format(*columns), placeholders=("%s, " * len(columns)).rstrip(" ,"), ) try: self._transfer_table_data(sql=sql, total_records=total_records) except mysql.connector.Error as err: self._logger.error( "MySQL transfer failed inserting data into table %s: %s", safe_identifier_length(table["name"]), err, ) raise # add indices if self._mysql_create_tables: self._add_indices(table["name"]) # add foreign keys if self._mysql_create_tables and not self._without_foreign_keys: self._add_foreign_keys(table["name"]) except Exception: # pylint: disable=W0706 raise finally: self._mysql_cur.execute("SET FOREIGN_KEY_CHECKS=1") self._logger.info("Done!")