Source code for sqlite3_to_mysql.transporter

"""Use to transfer an SQLite 3 database to MySQL."""

import logging
import os
import re
import sqlite3
import typing as t
from datetime import timedelta
from decimal import Decimal
from itertools import chain
from math import ceil
from os.path import isfile, realpath
from sys import stdout

import mysql.connector
import typing_extensions as tx
from mysql.connector import CharacterSet
from mysql.connector import __version__ as mysql_connector_version_string
from mysql.connector import errorcode
from packaging import version
from tqdm import tqdm, trange

from sqlite3_to_mysql.sqlite_utils import (
    adapt_decimal,
    adapt_timedelta,
    check_sqlite_table_xinfo_support,
    convert_date,
    convert_decimal,
    convert_timedelta,
    unicase_compare,
)

from .mysql_utils import (
    MYSQL_BLOB_COLUMN_TYPES,
    MYSQL_COLUMN_TYPES,
    MYSQL_COLUMN_TYPES_WITHOUT_DEFAULT,
    MYSQL_INSERT_METHOD,
    MYSQL_TEXT_COLUMN_TYPES,
    MYSQL_TEXT_COLUMN_TYPES_WITH_JSON,
    check_mysql_fulltext_support,
    check_mysql_json_support,
    check_mysql_values_alias_support,
    safe_identifier_length,
)
from .types import SQLite3toMySQLAttributes, SQLite3toMySQLParams


[docs] class SQLite3toMySQL(SQLite3toMySQLAttributes): """Use this class to transfer an SQLite 3 database to MySQL.""" COLUMN_PATTERN: t.Pattern[str] = re.compile(r"^[^(]+") COLUMN_LENGTH_PATTERN: t.Pattern[str] = re.compile(r"\(\d+\)") COLUMN_PRECISION_AND_SCALE_PATTERN: t.Pattern[str] = re.compile(r"\(\d+,\d+\)") COLUMN_UNSIGNED_PATTERN: t.Pattern[str] = re.compile(r"\bUNSIGNED\b", re.IGNORECASE) MYSQL_CONNECTOR_VERSION: version.Version = version.parse(mysql_connector_version_string)
[docs] def __init__(self, **kwargs: tx.Unpack[SQLite3toMySQLParams]): """Constructor.""" if kwargs.get("sqlite_file") is None: raise ValueError("Please provide an SQLite file") elif not isfile(str(kwargs.get("sqlite_file"))): raise FileNotFoundError("SQLite file does not exist") else: self._sqlite_file = realpath(str(kwargs.get("sqlite_file"))) if kwargs.get("mysql_user") is not None: self._mysql_user = str(kwargs.get("mysql_user")) else: raise ValueError("Please provide a MySQL user") self._mysql_password = str(kwargs.get("mysql_password")) if kwargs.get("mysql_password") else None self._mysql_host = str(kwargs.get("mysql_host", "localhost")) self._mysql_port = kwargs.get("mysql_port", 3306) or 3306 self._sqlite_tables = kwargs.get("sqlite_tables") or tuple() self._without_foreign_keys = bool(self._sqlite_tables) or bool(kwargs.get("without_foreign_keys", False)) self._mysql_ssl_disabled = bool(kwargs.get("mysql_ssl_disabled", False)) self._chunk_size = bool(kwargs.get("chunk")) self._quiet = bool(kwargs.get("quiet", False)) self._logger = self._setup_logger(log_file=kwargs.get("log_file", None), quiet=self._quiet) self._mysql_database = kwargs.get("mysql_database", "transfer") or "transfer" self._mysql_insert_method = str(kwargs.get("mysql_insert_method", "IGNORE")).upper() if self._mysql_insert_method not in MYSQL_INSERT_METHOD: self._mysql_insert_method = "IGNORE" self._mysql_truncate_tables = bool(kwargs.get("mysql_truncate_tables", False)) self._mysql_integer_type = str(kwargs.get("mysql_integer_type", "INT(11)")).upper() self._mysql_string_type = str(kwargs.get("mysql_string_type", "VARCHAR(255)")).upper() self._mysql_text_type = str(kwargs.get("mysql_text_type", "TEXT")).upper() if self._mysql_text_type not in MYSQL_TEXT_COLUMN_TYPES: self._mysql_text_type = "TEXT" self._mysql_charset = kwargs.get("mysql_charset", "utf8mb4") or "utf8mb4" self._mysql_collation = ( kwargs.get("mysql_collation") or CharacterSet().get_default_collation(self._mysql_charset.lower())[0] ) if not kwargs.get("mysql_collation") and self._mysql_collation == "utf8mb4_0900_ai_ci": self._mysql_collation = "utf8mb4_unicode_ci" self._ignore_duplicate_keys = kwargs.get("ignore_duplicate_keys", False) or False self._use_fulltext = kwargs.get("use_fulltext", False) or False self._with_rowid = kwargs.get("with_rowid", False) or False sqlite3.register_adapter(Decimal, adapt_decimal) sqlite3.register_converter("DECIMAL", convert_decimal) sqlite3.register_adapter(timedelta, adapt_timedelta) sqlite3.register_converter("DATE", convert_date) sqlite3.register_converter("TIME", convert_timedelta) self._sqlite = sqlite3.connect(realpath(self._sqlite_file), detect_types=sqlite3.PARSE_DECLTYPES) self._sqlite.row_factory = sqlite3.Row self._sqlite.create_collation("unicase", unicase_compare) self._sqlite_cur = self._sqlite.cursor() self._sqlite_version = self._get_sqlite_version() self._sqlite_table_xinfo_support = check_sqlite_table_xinfo_support(self._sqlite_version) self._mysql_create_tables = bool(kwargs.get("mysql_create_tables", True)) self._mysql_transfer_data = bool(kwargs.get("mysql_transfer_data", True)) if not self._mysql_transfer_data and not self._mysql_create_tables: raise ValueError("Unable to continue without transferring data or creating tables!") try: _mysql_connection = mysql.connector.connect( user=self._mysql_user, password=self._mysql_password, host=self._mysql_host, port=self._mysql_port, ssl_disabled=self._mysql_ssl_disabled, use_pure=True, charset=self._mysql_charset, collation=self._mysql_collation, ) if isinstance(_mysql_connection, mysql.connector.MySQLConnection): self._mysql = _mysql_connection else: raise ConnectionError("Unable to connect to MySQL") if not self._mysql.is_connected(): raise ConnectionError("Unable to connect to MySQL") self._mysql_cur = self._mysql.cursor(prepared=True) try: self._mysql.database = self._mysql_database except mysql.connector.Error as err: if err.errno == errorcode.ER_BAD_DB_ERROR: self._create_database() else: self._logger.error(err) raise self._mysql_version = self._get_mysql_version() self._mysql_json_support = check_mysql_json_support(self._mysql_version) self._mysql_fulltext_support = check_mysql_fulltext_support(self._mysql_version) if self._use_fulltext and not self._mysql_fulltext_support: raise ValueError("Your MySQL version does not support InnoDB FULLTEXT indexes!") except mysql.connector.Error as err: self._logger.error(err) raise
@classmethod def _setup_logger( cls, log_file: t.Optional[t.Union[str, "os.PathLike[t.Any]"]] = None, quiet: bool = False ) -> logging.Logger: formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger = logging.getLogger(cls.__name__) logger.setLevel(logging.DEBUG) if not quiet: screen_handler = logging.StreamHandler(stream=stdout) screen_handler.setFormatter(formatter) logger.addHandler(screen_handler) if log_file: file_handler = logging.FileHandler(realpath(log_file), mode="w") file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger def _get_mysql_version(self) -> str: try: self._mysql_cur.execute("SHOW VARIABLES LIKE 'version'") row = self._mysql_cur.fetchone() if row: return str(row[1]) else: self._logger.error("MySQL failed checking for InnoDB version") raise mysql.connector.Error("MySQL failed checking for InnoDB version") except (IndexError, mysql.connector.Error) as err: self._logger.error( "MySQL failed checking for InnoDB version: %s", err, ) raise def _get_sqlite_version(self) -> str: try: self._sqlite_cur.execute("SELECT sqlite_version()") return str(self._sqlite_cur.fetchone()[0]) except (IndexError, sqlite3.Error) as err: self._logger.error( "SQLite failed checking for InnoDB version: %s", err, ) raise def _sqlite_table_has_rowid(self, table: str) -> bool: try: self._sqlite_cur.execute(f'SELECT rowid FROM "{table}" LIMIT 1') self._sqlite_cur.fetchall() return True except sqlite3.OperationalError: return False def _create_database(self) -> None: try: self._mysql_cur.execute( f""" CREATE DATABASE IF NOT EXISTS `{self._mysql_database}` DEFAULT CHARACTER SET {self._mysql_charset} DEFAULT COLLATE {self._mysql_collation} """ ) self._mysql_cur.close() self._mysql.commit() self._mysql.database = self._mysql_database self._mysql_cur = self._mysql.cursor(prepared=True) # pylint: disable=W0201 except mysql.connector.Error as err: self._logger.error( "MySQL failed creating databse %s: %s", self._mysql_database, err, ) raise @classmethod def _valid_column_type(cls, column_type: str) -> t.Optional[t.Match[str]]: return cls.COLUMN_PATTERN.match(column_type.strip()) def _translate_type_from_sqlite_to_mysql(self, column_type: str) -> str: """This could be optimized even further, however is seems adequate.""" full_column_type: str = column_type.upper() unsigned: bool = self.COLUMN_UNSIGNED_PATTERN.search(full_column_type) is not None match: t.Optional[t.Match[str]] = self._valid_column_type(column_type) if not match: raise ValueError(f'"{column_type}" is not a valid column_type!') data_type: str = match.group(0).upper() if data_type in {"TEXT", "CLOB", "STRING"}: return self._mysql_text_type if data_type in {"CHARACTER", "NCHAR", "NATIVE CHARACTER"}: return "CHAR" + self._column_type_length(column_type) if data_type in {"VARYING CHARACTER", "NVARCHAR", "VARCHAR"}: if self._mysql_string_type in MYSQL_TEXT_COLUMN_TYPES: return self._mysql_string_type length = self._column_type_length(column_type) if not length: return self._mysql_string_type match = self._valid_column_type(self._mysql_string_type) if match: return match.group(0).upper() + length if data_type == "UNSIGNED BIG INT": return f"BIGINT{self._column_type_length(column_type)} UNSIGNED" if data_type.startswith(("TINYINT", "INT1")): return f"TINYINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("SMALLINT", "INT2")): return f"SMALLINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("MEDIUMINT", "INT3")): return f"MEDIUMINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith("INT4"): return f"INT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("BIGINT", "INT8")): return f"BIGINT{self._column_type_length(column_type)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("INT64", "NUMERIC")): if data_type == "NUMERIC" and self._column_type_precision_and_scale(full_column_type) != "": return f"DECIMAL{self._column_type_precision_and_scale(column_type)}{' UNSIGNED' if unsigned else ''}" return f"BIGINT{self._column_type_length(column_type, 19)}{' UNSIGNED' if unsigned else ''}" if data_type.startswith(("INTEGER", "INT")): length = self._column_type_length(column_type) if not length: if "UNSIGNED" in self._mysql_integer_type: return self._mysql_integer_type return f"{self._mysql_integer_type}{' UNSIGNED' if unsigned else ''}" match = self._valid_column_type(self._mysql_integer_type) if match: if "UNSIGNED" in self._mysql_integer_type: return f"{match.group(0).upper()}{length} UNSIGNED" return f"{match.group(0).upper()}{length}{' UNSIGNED' if unsigned else ''}" if data_type in {"BOOL", "BOOLEAN"}: return "TINYINT(1)" if data_type.startswith(("REAL", "DOUBLE", "FLOAT", "DECIMAL", "DEC", "FIXED")): return full_column_type if data_type not in MYSQL_COLUMN_TYPES: return self._mysql_string_type return full_column_type @classmethod def _column_type_length(cls, column_type: str, default: t.Optional[t.Union[str, int, float]] = None) -> str: suffix: t.Optional[t.Match[str]] = cls.COLUMN_LENGTH_PATTERN.search(column_type) if suffix: return suffix.group(0) if default: return f"({default})" return "" @classmethod def _column_type_precision_and_scale(cls, column_type: str) -> str: suffix: t.Optional[t.Match[str]] = cls.COLUMN_PRECISION_AND_SCALE_PATTERN.search(column_type) if suffix: return suffix.group(0) return "" def _create_table(self, table_name: str, transfer_rowid: bool = False) -> None: primary_keys: t.List[t.Dict[str, str]] = [] sql: str = f"CREATE TABLE IF NOT EXISTS `{safe_identifier_length(table_name)}` ( " if transfer_rowid: sql += " `rowid` BIGINT NOT NULL, " if self._sqlite_table_xinfo_support: self._sqlite_cur.execute(f'PRAGMA table_xinfo("{table_name}")') else: self._sqlite_cur.execute(f'PRAGMA table_info("{table_name}")') rows: t.List[t.Any] = self._sqlite_cur.fetchall() compound_primary_key: bool = len(tuple(True for row in rows if dict(row)["pk"] > 0)) > 1 for row in rows: column: t.Dict[str, t.Any] = dict(row) mysql_safe_name: str = safe_identifier_length(column["name"]) column_type: str = self._translate_type_from_sqlite_to_mysql(column["type"]) # The "hidden" value is 0 for visible columns, 1 for "hidden" columns, # 2 for computed virtual columns and 3 for computed stored columns. # Read more on hidden columns here https://www.sqlite.org/pragma.html#pragma_table_xinfo if "hidden" in column and column["hidden"] == 1: continue auto_increment: bool = ( column["pk"] > 0 and column_type.startswith(("INT", "BIGINT")) and not compound_primary_key ) sql += " `{name}` {type} {notnull} {default} {auto_increment}, ".format( name=mysql_safe_name, type=column_type, notnull="NOT NULL" if column["notnull"] or column["pk"] else "NULL", auto_increment="AUTO_INCREMENT" if auto_increment else "", default=( "DEFAULT " + column["dflt_value"] if column["dflt_value"] and column_type not in MYSQL_COLUMN_TYPES_WITHOUT_DEFAULT and not auto_increment else "" ), ) if column["pk"] > 0: primary_key: t.Dict[str, str] = { "column": mysql_safe_name, "length": "", } # In case we have a non-numeric primary key if column_type in ( MYSQL_TEXT_COLUMN_TYPES_WITH_JSON + MYSQL_BLOB_COLUMN_TYPES ) or column_type.startswith(("CHAR", "VARCHAR")): primary_key["length"] = self._column_type_length(column_type, 255) primary_keys.append(primary_key) sql = sql.rstrip(", ") if len(primary_keys) > 0: sql += ", PRIMARY KEY ({columns})".format( columns=", ".join("`{column}`{length}".format(**primary_key) for primary_key in primary_keys) ) if transfer_rowid: sql += f", CONSTRAINT `{safe_identifier_length(table_name)}_rowid` UNIQUE (`rowid`)" sql += f" ) ENGINE=InnoDB DEFAULT CHARSET={self._mysql_charset} COLLATE={self._mysql_collation}" try: self._mysql_cur.execute(sql) self._mysql.commit() except mysql.connector.Error as err: self._logger.error( "MySQL failed creating table %s: %s", safe_identifier_length(table_name), err, ) raise def _truncate_table(self, table_name: str) -> None: self._mysql_cur.execute( """ SELECT `TABLE_NAME` FROM `INFORMATION_SCHEMA`.`TABLES` WHERE `TABLE_SCHEMA` = %s AND `TABLE_NAME` = %s LIMIT 1 """, (self._mysql_database, safe_identifier_length(table_name)), ) if len(self._mysql_cur.fetchall()) > 0: self._logger.info("Truncating table %s", safe_identifier_length(table_name)) self._mysql_cur.execute(f"TRUNCATE TABLE `{safe_identifier_length(table_name)}`") def _add_indices(self, table_name: str) -> None: self._sqlite_cur.execute(f'PRAGMA table_info("{table_name}")') table_columns: t.Dict[str, str] = {} for row in self._sqlite_cur.fetchall(): column: t.Dict[str, t.Any] = dict(row) table_columns[column["name"]] = column["type"] self._sqlite_cur.execute(f'PRAGMA index_list("{table_name}")') indices: t.Tuple[t.Dict[str, t.Any], ...] = tuple(dict(row) for row in self._sqlite_cur.fetchall()) for index in indices: if index["origin"] == "pk": continue self._sqlite_cur.execute(f'PRAGMA index_info("{index["name"]}")') index_infos: t.Tuple[t.Dict[str, t.Any], ...] = tuple(dict(row) for row in self._sqlite_cur.fetchall()) index_type: str = "UNIQUE" if int(index["unique"]) == 1 else "INDEX" if any( table_columns[index_info["name"]].upper() in MYSQL_TEXT_COLUMN_TYPES_WITH_JSON for index_info in index_infos ): if self._use_fulltext and self._mysql_fulltext_support: # Use fulltext if requested and available index_type = "FULLTEXT" index_columns: str = ",".join( f'`{safe_identifier_length(index_info["name"])}`' for index_info in index_infos ) else: # Limit the max TEXT field index length to 255 index_columns = ", ".join( "`{column}`{length}".format( column=safe_identifier_length(index_info["name"]), length=( "(255)" if table_columns[index_info["name"]].upper() in MYSQL_TEXT_COLUMN_TYPES_WITH_JSON else "" ), ) for index_info in index_infos ) else: column_list: t.List[str] = [] for index_info in index_infos: index_length: str = "" # Limit the max BLOB field index length to 255 if table_columns[index_info["name"]].upper() in MYSQL_BLOB_COLUMN_TYPES: index_length = "(255)" else: suffix: t.Optional[t.Match[str]] = self.COLUMN_LENGTH_PATTERN.search( table_columns[index_info["name"]] ) if suffix: index_length = suffix.group(0) column_list.append(f'`{safe_identifier_length(index_info["name"])}`{index_length}') index_columns = ", ".join(column_list) try: self._add_index( table_name=table_name, index_type=index_type, index=index, index_columns=index_columns, index_infos=index_infos, ) except mysql.connector.Error as err: if err.errno == errorcode.ER_BAD_FT_COLUMN and index_type == "FULLTEXT": # handle bad FULLTEXT index self._add_index( table_name=table_name, index_type="UNIQUE" if int(index["unique"]) == 1 else "INDEX", index=index, index_columns=", ".join( "`{column}`{length}".format( column=safe_identifier_length(index_info["name"]), length=( "(255)" if table_columns[index_info["name"]].upper() in MYSQL_TEXT_COLUMN_TYPES_WITH_JSON else "" ), ) for index_info in index_infos ), index_infos=index_infos, ) else: raise def _add_index( self, table_name: str, index_type: str, index: t.Dict[str, t.Any], index_columns: str, index_infos: t.Tuple[t.Dict[str, t.Any], ...], index_iteration: int = 0, ) -> None: sql: str = ( """ ALTER TABLE `{table}` ADD {index_type} `{name}`({columns}) """.format( table=safe_identifier_length(table_name), index_type=index_type, name=( safe_identifier_length(index["name"]) if index_iteration == 0 else f'{safe_identifier_length(index["name"], max_length=60)}_{index_iteration}' ), columns=index_columns, ) ) try: self._logger.info( """Adding %s to column "%s" in table %s""", "unique index" if int(index["unique"]) == 1 else "index", ", ".join(safe_identifier_length(index_info["name"]) for index_info in index_infos), safe_identifier_length(table_name), ) self._mysql_cur.execute(sql) self._mysql.commit() except mysql.connector.Error as err: if err.errno == errorcode.ER_DUP_KEYNAME: if not self._ignore_duplicate_keys: # handle a duplicate key name self._add_index( table_name=table_name, index_type=index_type, index=index, index_columns=index_columns, index_infos=index_infos, index_iteration=index_iteration + 1, ) self._logger.warning( """Duplicate key "%s" in table %s detected! Trying to create new key "%s_%s" ...""", safe_identifier_length(index["name"]), safe_identifier_length(table_name), safe_identifier_length(index["name"]), index_iteration + 1, ) else: self._logger.warning( """Ignoring duplicate key "%s" in table %s!""", safe_identifier_length(index["name"]), safe_identifier_length(table_name), ) elif err.errno == errorcode.ER_BAD_FT_COLUMN: # handle bad FULLTEXT index self._logger.warning( """Failed adding FULLTEXT index to column "%s" in table %s. Retrying without FULLTEXT ...""", ", ".join(safe_identifier_length(index_info["name"]) for index_info in index_infos), safe_identifier_length(table_name), ) raise else: self._logger.error( """MySQL failed adding index to column "%s" in table %s: %s""", ", ".join(safe_identifier_length(index_info["name"]) for index_info in index_infos), safe_identifier_length(table_name), err, ) raise def _add_foreign_keys(self, table_name: str) -> None: self._sqlite_cur.execute(f'PRAGMA foreign_key_list("{table_name}")') for row in self._sqlite_cur.fetchall(): foreign_key: t.Dict[str, t.Any] = dict(row) sql = """ ALTER TABLE `{table}` ADD CONSTRAINT `{table}_FK_{id}_{seq}` FOREIGN KEY (`{column}`) REFERENCES `{ref_table}`(`{ref_column}`) ON DELETE {on_delete} ON UPDATE {on_update} """.format( id=foreign_key["id"], seq=foreign_key["seq"], table=safe_identifier_length(table_name), column=safe_identifier_length(foreign_key["from"]), ref_table=safe_identifier_length(foreign_key["table"]), ref_column=safe_identifier_length(foreign_key["to"]), on_delete=( foreign_key["on_delete"].upper() if foreign_key["on_delete"].upper() != "SET DEFAULT" else "NO ACTION" ), on_update=( foreign_key["on_update"].upper() if foreign_key["on_update"].upper() != "SET DEFAULT" else "NO ACTION" ), ) try: self._logger.info( "Adding foreign key to %s.%s referencing %s.%s", safe_identifier_length(table_name), safe_identifier_length(foreign_key["from"]), safe_identifier_length(foreign_key["table"]), safe_identifier_length(foreign_key["to"]), ) self._mysql_cur.execute(sql) self._mysql.commit() except mysql.connector.Error as err: self._logger.error( "MySQL failed adding foreign key to %s.%s referencing %s.%s: %s", safe_identifier_length(table_name), safe_identifier_length(foreign_key["from"]), safe_identifier_length(foreign_key["table"]), safe_identifier_length(foreign_key["to"]), err, ) raise def _transfer_table_data(self, sql: str, total_records: int = 0) -> None: if self._chunk_size is not None and self._chunk_size > 0: for _ in trange(0, int(ceil(total_records / self._chunk_size)), disable=self._quiet): self._mysql_cur.executemany( sql, (tuple(row) for row in self._sqlite_cur.fetchmany(self._chunk_size)), # type: ignore ) else: self._mysql_cur.executemany( sql, ( # type: ignore tuple(row) for row in tqdm( self._sqlite_cur.fetchall(), total=total_records, disable=self._quiet, ) ), ) self._mysql.commit()
[docs] def transfer(self) -> None: """The primary and only method with which we transfer all the data.""" if len(self._sqlite_tables) > 0: # transfer only specific tables self._sqlite_cur.execute( f""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' AND name IN({("?, " * len(self._sqlite_tables)).rstrip(" ,")}) """, self._sqlite_tables, ) else: # transfer all tables self._sqlite_cur.execute( """ SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' """ ) try: self._mysql_cur.execute("SET FOREIGN_KEY_CHECKS=0") for row in self._sqlite_cur.fetchall(): table: t.Dict[str, t.Any] = dict(row) # check if we're transferring rowid transfer_rowid: bool = self._with_rowid and self._sqlite_table_has_rowid(table["name"]) # create the table if self._mysql_create_tables: self._create_table(table["name"], transfer_rowid=transfer_rowid) # truncate the table on request if self._mysql_truncate_tables: self._truncate_table(table["name"]) # get the size of the data if self._mysql_transfer_data: self._sqlite_cur.execute(f'SELECT COUNT(*) AS total_records FROM "{table["name"]}"') total_records = int(dict(self._sqlite_cur.fetchone())["total_records"]) else: total_records = 0 # only continue if there is anything to transfer if total_records > 0: # populate it self._logger.info("Transferring table %s", table["name"]) self._sqlite_cur.execute( '''SELECT {rowid} * FROM "{table_name}"'''.format( rowid='rowid as "rowid",' if transfer_rowid else "", table_name=table["name"], ) ) columns: t.List[str] = [ safe_identifier_length(column[0]) for column in self._sqlite_cur.description ] sql: str if self._mysql_insert_method.upper() == "UPDATE": sql = """ INSERT INTO `{table}` ({fields}) {values_clause} ON DUPLICATE KEY UPDATE {field_updates} """.format( table=safe_identifier_length(table["name"]), fields=("`{}`, " * len(columns)).rstrip(" ,").format(*columns), values_clause=( "VALUES ({placeholders}) AS `__new__`" if check_mysql_values_alias_support(self._mysql_version) else "VALUES ({placeholders})" ).format(placeholders=("%s, " * len(columns)).rstrip(" ,")), field_updates=( ("`{}`=`__new__`.`{}`, " * len(columns)).rstrip(" ,") if check_mysql_values_alias_support(self._mysql_version) else ("`{}`=`{}`, " * len(columns)).rstrip(" ,") ).format(*list(chain.from_iterable((column, column) for column in columns))), ) else: sql = """ INSERT {ignore} INTO `{table}` ({fields}) VALUES ({placeholders}) """.format( ignore="IGNORE" if self._mysql_insert_method.upper() == "IGNORE" else "", table=safe_identifier_length(table["name"]), fields=("`{}`, " * len(columns)).rstrip(" ,").format(*columns), placeholders=("%s, " * len(columns)).rstrip(" ,"), ) try: self._transfer_table_data(sql=sql, total_records=total_records) except mysql.connector.Error as err: self._logger.error( "MySQL transfer failed inserting data into table %s: %s", safe_identifier_length(table["name"]), err, ) raise # add indices if self._mysql_create_tables: self._add_indices(table["name"]) # add foreign keys if self._mysql_create_tables and not self._without_foreign_keys: self._add_foreign_keys(table["name"]) except Exception: # pylint: disable=W0706 raise finally: self._mysql_cur.execute("SET FOREIGN_KEY_CHECKS=1") self._logger.info("Done!")