Source code for mysql_to_sqlite3.transporter

"""Use to transfer a MySQL database to SQLite."""

import logging
import os
import re
import sqlite3
import typing as t
from datetime import timedelta
from decimal import Decimal
from math import ceil
from os.path import realpath
from sys import stdout

import mysql.connector
from mysql.connector import CharacterSet, errorcode
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.types import RowItemType
from sqlglot import Expression, exp, parse_one
from sqlglot.errors import ParseError
from tqdm import tqdm, trange


try:
    # Python 3.11+
    from typing import Unpack  # type: ignore[attr-defined]
except ImportError:
    # Python < 3.11
    from typing_extensions import Unpack  # type: ignore

from mysql_to_sqlite3.mysql_utils import CHARSET_INTRODUCERS
from mysql_to_sqlite3.sqlite_utils import (
    CollatingSequences,
    Integer_Types,
    adapt_decimal,
    adapt_timedelta,
    convert_date,
    convert_decimal,
    convert_timedelta,
    encode_data_for_sqlite,
)
from mysql_to_sqlite3.types import MySQLtoSQLiteAttributes, MySQLtoSQLiteParams


[docs] class MySQLtoSQLite(MySQLtoSQLiteAttributes): """Use this class to transfer a MySQL database to SQLite.""" COLUMN_PATTERN: t.Pattern[str] = re.compile(r"^[^(]+") COLUMN_LENGTH_PATTERN: t.Pattern[str] = re.compile(r"\(\d+\)$")
[docs] def __init__(self, **kwargs: Unpack[MySQLtoSQLiteParams]) -> None: """Constructor.""" if kwargs.get("mysql_database") is not None: self._mysql_database = str(kwargs.get("mysql_database")) else: raise ValueError("Please provide a MySQL database") if kwargs.get("mysql_user") is not None: self._mysql_user = str(kwargs.get("mysql_user")) else: raise ValueError("Please provide a MySQL user") if kwargs.get("sqlite_file") is None: raise ValueError("Please provide an SQLite file") else: self._sqlite_file = realpath(str(kwargs.get("sqlite_file"))) password: t.Optional[t.Union[str, bool]] = kwargs.get("mysql_password") self._mysql_password = password if isinstance(password, str) else None self._mysql_host = kwargs.get("mysql_host", "localhost") or "localhost" self._mysql_port = kwargs.get("mysql_port", 3306) or 3306 self._mysql_charset = kwargs.get("mysql_charset", "utf8mb4") or "utf8mb4" self._mysql_collation = ( kwargs.get("mysql_collation") or CharacterSet().get_default_collation(self._mysql_charset.lower())[0] ) if not kwargs.get("mysql_collation") and self._mysql_collation == "utf8mb4_0900_ai_ci": self._mysql_collation = "utf8mb4_unicode_ci" self._mysql_tables = kwargs.get("mysql_tables") or tuple() self._exclude_mysql_tables = kwargs.get("exclude_mysql_tables") or tuple() if bool(self._mysql_tables) and bool(self._exclude_mysql_tables): raise ValueError("mysql_tables and exclude_mysql_tables are mutually exclusive") self._limit_rows = kwargs.get("limit_rows", 0) or 0 if kwargs.get("collation") is not None and str(kwargs.get("collation")).upper() in { CollatingSequences.BINARY, CollatingSequences.NOCASE, CollatingSequences.RTRIM, }: self._collation = str(kwargs.get("collation")).upper() else: self._collation = CollatingSequences.BINARY self._prefix_indices = kwargs.get("prefix_indices", False) or False if bool(self._mysql_tables) or bool(self._exclude_mysql_tables): self._without_foreign_keys = True else: self._without_foreign_keys = bool(kwargs.get("without_foreign_keys", False)) self._without_data = bool(kwargs.get("without_data", False)) self._without_tables = bool(kwargs.get("without_tables", False)) if self._without_tables and self._without_data: raise ValueError("Unable to continue without transferring data or creating tables!") self._mysql_ssl_disabled = bool(kwargs.get("mysql_ssl_disabled", False)) self._current_chunk_number = 0 self._chunk_size = kwargs.get("chunk") or None self._buffered = bool(kwargs.get("buffered", False)) self._vacuum = bool(kwargs.get("vacuum", False)) self._quiet = bool(kwargs.get("quiet", False)) self._views_as_views = bool(kwargs.get("views_as_views", True)) self._sqlite_strict = bool(kwargs.get("sqlite_strict", False)) self._logger = self._setup_logger(log_file=kwargs.get("log_file") or None, quiet=self._quiet) if self._sqlite_strict and sqlite3.sqlite_version < "3.37.0": self._logger.warning( "SQLite version %s does not support STRICT tables. Tables will be created without strict mode.", sqlite3.sqlite_version, ) self._sqlite_strict = False sqlite3.register_adapter(Decimal, adapt_decimal) sqlite3.register_converter("DECIMAL", convert_decimal) sqlite3.register_adapter(timedelta, adapt_timedelta) sqlite3.register_converter("DATE", convert_date) sqlite3.register_converter("TIME", convert_timedelta) self._sqlite = sqlite3.connect(realpath(self._sqlite_file), detect_types=sqlite3.PARSE_DECLTYPES) self._sqlite.row_factory = sqlite3.Row self._sqlite_cur = self._sqlite.cursor() self._json_as_text = bool(kwargs.get("json_as_text", False)) self._sqlite_json1_extension_enabled = not self._json_as_text and self._check_sqlite_json1_extension_enabled() # Track seen SQLite index names to generate unique names when prefixing is disabled self._seen_sqlite_index_names: t.Set[str] = set() # Counter for duplicate index names to assign numeric suffixes (name_2, name_3, ...) self._sqlite_index_name_counters: t.Dict[str, int] = {} try: _mysql_connection = mysql.connector.connect( user=self._mysql_user, password=self._mysql_password, host=self._mysql_host, port=self._mysql_port, ssl_disabled=self._mysql_ssl_disabled, charset=self._mysql_charset, collation=self._mysql_collation, ) if isinstance(_mysql_connection, MySQLConnectionAbstract): self._mysql = _mysql_connection else: raise ConnectionError("Unable to connect to MySQL") if not self._mysql.is_connected(): raise ConnectionError("Unable to connect to MySQL") self._mysql_cur = self._mysql.cursor(buffered=self._buffered, raw=True) # type: ignore[assignment] self._mysql_cur_prepared = self._mysql.cursor(prepared=True) # type: ignore[assignment] self._mysql_cur_dict = self._mysql.cursor( # type: ignore[assignment] buffered=self._buffered, dictionary=True, ) try: self._mysql.database = self._mysql_database except (mysql.connector.Error, Exception) as err: if hasattr(err, "errno") and err.errno == errorcode.ER_BAD_DB_ERROR: self._logger.error("MySQL Database does not exist!") raise self._logger.error(err) raise except mysql.connector.Error as err: self._logger.error(err) raise
@classmethod def _setup_logger( cls, log_file: t.Optional[t.Union[str, "os.PathLike[t.Any]"]] = None, quiet: bool = False ) -> logging.Logger: formatter: logging.Formatter = logging.Formatter( fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger: logging.Logger = logging.getLogger(cls.__name__) logger.setLevel(logging.DEBUG) if not quiet: screen_handler = logging.StreamHandler(stream=stdout) screen_handler.setFormatter(formatter) logger.addHandler(screen_handler) if log_file: file_handler = logging.FileHandler(realpath(log_file), mode="w") file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger @classmethod def _valid_column_type(cls, column_type: str) -> t.Optional[t.Match[str]]: return cls.COLUMN_PATTERN.match(column_type.strip()) @classmethod def _column_type_length(cls, column_type: str) -> str: suffix: t.Optional[t.Match[str]] = cls.COLUMN_LENGTH_PATTERN.search(column_type) if suffix: return suffix.group(0) return "" @staticmethod def _decode_column_type(column_type: t.Union[str, bytes]) -> str: if isinstance(column_type, str): return column_type if isinstance(column_type, bytes): try: return column_type.decode() except (UnicodeDecodeError, AttributeError): pass return str(column_type) @classmethod def _translate_type_from_mysql_to_sqlite( cls, column_type: t.Union[str, bytes], sqlite_json1_extension_enabled=False ) -> str: _column_type: str = cls._decode_column_type(column_type) # This could be optimized even further, however is seems adequate. match: t.Optional[t.Match[str]] = cls._valid_column_type(_column_type) if not match: raise ValueError(f'"{_column_type}" is not a valid column_type!') data_type: str = match.group(0).upper() if data_type.endswith(" UNSIGNED"): data_type = data_type.replace(" UNSIGNED", "") if data_type in { "BIGINT", "BLOB", "BOOLEAN", "DATE", "DATETIME", "DECIMAL", "DOUBLE", "FLOAT", "INTEGER", "MEDIUMINT", "NUMERIC", "REAL", "SMALLINT", "TIME", "TINYINT", "YEAR", }: return data_type if data_type == "DOUBLE PRECISION": return "DOUBLE" if data_type == "FIXED": return "DECIMAL" if data_type in {"CHARACTER VARYING", "CHAR VARYING"}: return "VARCHAR" + cls._column_type_length(_column_type) if data_type in {"NATIONAL CHARACTER VARYING", "NATIONAL CHAR VARYING", "NATIONAL VARCHAR"}: return "NVARCHAR" + cls._column_type_length(_column_type) if data_type == "NATIONAL CHARACTER": return "NCHAR" + cls._column_type_length(_column_type) if data_type in { "BIT", "BINARY", "LONGBLOB", "MEDIUMBLOB", "TINYBLOB", "VARBINARY", }: return "BLOB" if data_type in {"NCHAR", "NVARCHAR", "VARCHAR"}: return data_type + cls._column_type_length(_column_type) if data_type == "CHAR": return "CHARACTER" + cls._column_type_length(_column_type) if data_type == "INT": return "INTEGER" if data_type in "TIMESTAMP": return "DATETIME" if data_type == "JSON" and sqlite_json1_extension_enabled: return "JSON" # As a last resort, try sqlglot to derive a better SQLite-compatible type sqlglot_type: t.Optional[str] = cls._transpile_mysql_type_to_sqlite( _column_type, sqlite_json1_extension_enabled=sqlite_json1_extension_enabled ) if sqlglot_type: return sqlglot_type return "TEXT" @classmethod def _transpile_mysql_expr_to_sqlite(cls, expr_sql: str) -> t.Optional[str]: """Transpile a MySQL scalar expression to SQLite using sqlglot. Returns the SQLite SQL string on success, or None on failure. """ cleaned: str = expr_sql.strip().rstrip(";") try: tree: Expression = parse_one(cleaned, read="mysql") return tree.sql(dialect="sqlite") except (ParseError, ValueError): return None except (AttributeError, TypeError): # pragma: no cover - unexpected sqlglot failure logging.getLogger(cls.__name__ if hasattr(cls, "__name__") else "MySQLtoSQLite").debug( "sqlglot failed to transpile expr: %r", expr_sql ) return None @classmethod def _normalize_literal_with_sqlglot(cls, expr_sql: str) -> t.Optional[str]: """Normalize a MySQL literal using sqlglot, returning SQLite SQL if literal-like.""" cleaned: str = expr_sql.strip().rstrip(";") try: node: Expression = parse_one(cleaned, read="mysql") except (ParseError, ValueError): return None if isinstance(node, exp.Literal): return node.sql(dialect="sqlite") if isinstance(node, exp.Paren) and isinstance(node.this, exp.Literal): return node.this.sql(dialect="sqlite") return None @staticmethod def _quote_sqlite_identifier(name: t.Union[str, bytes, bytearray]) -> str: """Safely quote an identifier for SQLite using sqlglot. Always returns a double-quoted identifier, doubling any embedded quotes. Accepts bytes and decodes them to str when needed. """ if isinstance(name, (bytes, bytearray)): try: s: str = name.decode() except (UnicodeDecodeError, AttributeError): s = str(name) else: s = str(name) try: # Normalize identifier using sqlglot, then wrap in quotes regardless normalized: str = exp.to_identifier(s).name except (AttributeError, ValueError, TypeError): # pragma: no cover - extremely unlikely normalized = s replaced: str = normalized.replace('"', '""') return f'"{replaced}"' @staticmethod def _escape_mysql_backticks(identifier: str) -> str: """Escape backticks in a MySQL identifier for safe backtick quoting.""" return identifier.replace("`", "``") @classmethod def _transpile_mysql_type_to_sqlite( cls, column_type: str, sqlite_json1_extension_enabled: bool = False ) -> t.Optional[str]: """Attempt to derive a suitable SQLite column type using sqlglot. This is used as a last-resort fallback when the built-in mapper does not recognize a MySQL type or synonym. It keeps existing behavior for known types and only augments unknowns. """ # Wrap the type in a CAST expression so sqlglot can parse it consistently. expr_sql: str = f"CAST(NULL AS {column_type.strip()})" try: tree: Expression = parse_one(expr_sql, read="mysql") rendered: str = tree.sql(dialect="sqlite") except (ParseError, ValueError, AttributeError, TypeError): return None # Extract the type inside CAST(NULL AS ...) m: t.Optional[t.Match[str]] = re.search(r"CAST\(NULL AS\s+([^)]+)\)", rendered, re.IGNORECASE) if not m: return None extracted: str = m.group(1).strip() upper: str = extracted.upper() # JSON handling: respect availability of JSON1 extension if "JSON" in upper: return "JSON" if sqlite_json1_extension_enabled else "TEXT" # Split out optional length suffix like (255) or (10,2) base: str = upper length_suffix: str = "" paren: t.Optional[t.Match[str]] = re.match(r"^([A-Z ]+)(\(.*\))$", upper) if paren: base = paren.group(1).strip() length_suffix = paren.group(2) # Minimal synonym normalization synonyms: t.Dict[str, str] = { "DOUBLE PRECISION": "DOUBLE", "FIXED": "DECIMAL", "CHAR VARYING": "VARCHAR", "CHARACTER VARYING": "VARCHAR", "NATIONAL VARCHAR": "NVARCHAR", "NATIONAL CHARACTER VARYING": "NVARCHAR", "NATIONAL CHAR VARYING": "NVARCHAR", "NATIONAL CHARACTER": "NCHAR", } base = synonyms.get(base, base) # Decide the final SQLite type, aligning with existing conventions if base in {"NCHAR", "NVARCHAR", "VARCHAR"} and length_suffix: return f"{base}{length_suffix}" if base in {"CHAR", "CHARACTER"}: return f"CHARACTER{length_suffix}" if base in {"DECIMAL", "NUMERIC"}: # Keep without length to match the existing mapper's style return base if base in { "DOUBLE", "REAL", "FLOAT", "INTEGER", "BIGINT", "SMALLINT", "MEDIUMINT", "TINYINT", "BLOB", "DATE", "DATETIME", "TIME", "YEAR", "BOOLEAN", }: return base if base in {"VARBINARY", "BINARY", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB"}: return "BLOB" if base in {"TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "CLOB"}: return "TEXT" # ENUM/SET and other complex types -> keep default behavior (TEXT) if base in {"ENUM", "SET"}: return "TEXT" # If we reached here, we didn't find a better mapping return None @classmethod def _translate_default_from_mysql_to_sqlite( cls, column_default: RowItemType = None, column_type: t.Optional[str] = None, column_extra: RowItemType = None, ) -> str: is_binary: bool is_hex: bool if isinstance(column_default, bytes): if column_type in { "BIT", "BINARY", "BLOB", "LONGBLOB", "MEDIUMBLOB", "TINYBLOB", "VARBINARY", }: if column_extra in {"DEFAULT_GENERATED", "default_generated"}: for charset_introducer in CHARSET_INTRODUCERS: if column_default.startswith(charset_introducer.encode()): is_binary = False is_hex = False for b_prefix in ("B", "b"): if column_default.startswith(rf"{charset_introducer} {b_prefix}\'".encode()): is_binary = True break for x_prefix in ("X", "x"): if column_default.startswith(rf"{charset_introducer} {x_prefix}\'".encode()): is_hex = True break column_default = ( column_default.replace(charset_introducer.encode(), b"") .replace(rb"x\'", b"") .replace(rb"X\'", b"") .replace(rb"b\'", b"") .replace(rb"B\'", b"") .replace(rb"\'", b"") .replace(rb"'", b"") .strip() ) if is_binary: return f"DEFAULT '{chr(int(column_default, 2))}'" if is_hex: return f"DEFAULT x'{column_default.decode()}'" break return f"DEFAULT x'{column_default.hex()}'" try: column_default = column_default.decode() except (UnicodeDecodeError, AttributeError): pass if column_default is None: return "" if isinstance(column_default, bool): if column_type == "BOOLEAN" and sqlite3.sqlite_version >= "3.23.0": if column_default: return "DEFAULT(TRUE)" return "DEFAULT(FALSE)" return f"DEFAULT '{int(column_default)}'" if isinstance(column_default, str): if column_default.lower() == "curtime()": return "DEFAULT CURRENT_TIME" if column_default.lower() == "curdate()": return "DEFAULT CURRENT_DATE" if column_default.lower() in {"current_timestamp()", "now()"}: return "DEFAULT CURRENT_TIMESTAMP" if column_extra in {"DEFAULT_GENERATED", "default_generated"}: if column_default.upper() in { "CURRENT_TIME", "CURRENT_DATE", "CURRENT_TIMESTAMP", }: return f"DEFAULT {column_default.upper()}" for charset_introducer in CHARSET_INTRODUCERS: if column_default.startswith(charset_introducer): is_binary = False is_hex = False for b_prefix in ("B", "b"): if column_default.startswith(rf"{charset_introducer} {b_prefix}\'"): is_binary = True break for x_prefix in ("X", "x"): if column_default.startswith(rf"{charset_introducer} {x_prefix}\'"): is_hex = True break column_default = ( column_default.replace(charset_introducer, "") .replace(r"x\'", "") .replace(r"X\'", "") .replace(r"b\'", "") .replace(r"B\'", "") .replace(r"\'", "") .replace(r"'", "") .strip() ) if is_binary: return f"DEFAULT '{chr(int(column_default, 2))}'" if is_hex: return f"DEFAULT x'{column_default}'" return f"DEFAULT '{column_default}'" transpiled: t.Optional[str] = cls._transpile_mysql_expr_to_sqlite(column_default) if transpiled: norm: str = transpiled.strip().rstrip(";") upper: str = norm.upper() if upper in {"CURRENT_TIME", "CURRENT_DATE", "CURRENT_TIMESTAMP"}: return f"DEFAULT {upper}" if upper == "NULL": return "DEFAULT NULL" # Allow blob hex literal X'..' if re.match(r"^[Xx]'[0-9A-Fa-f]+'$", norm): return f"DEFAULT {norm}" # Support boolean tokens when provided as generated strings if upper in {"TRUE", "FALSE"}: if column_type == "BOOLEAN" and sqlite3.sqlite_version >= "3.23.0": return f"DEFAULT({upper})" return f"DEFAULT '{1 if upper == 'TRUE' else 0}'" # Unwrap a single layer of parenthesis around a literal if norm.startswith("(") and norm.endswith(")"): inner = norm[1:-1].strip() if (inner.startswith("'") and inner.endswith("'")) or re.match(r"^-?\d+(?:\.\d+)?$", inner): return f"DEFAULT {inner}" # If the expression is arithmetic-only over numeric literals, allow as-is if re.match(r"^[\d\.\s\+\-\*/\(\)]+$", norm) and any(ch.isdigit() for ch in norm): return f"DEFAULT {norm}" # Allow numeric or single-quoted string literals as-is if (norm.startswith("'") and norm.endswith("'")) or re.match(r"^-?\d+(?:\.\d+)?$", norm): return f"DEFAULT {norm}" # Allow simple arithmetic constant expressions composed of numbers and + - * / if re.match(r"^[\d\.\s\+\-\*/\(\)]+$", norm) and any(ch.isdigit() for ch in norm): return f"DEFAULT {norm}" stripped_default = column_default.strip() if stripped_default.startswith("'") or ( stripped_default.startswith("(") and stripped_default.endswith(")") ): normalized_literal: t.Optional[str] = cls._normalize_literal_with_sqlglot(column_default) if normalized_literal is not None: return f"DEFAULT {normalized_literal}" # Fallback: robustly escape single quotes for plain string defaults _escaped = column_default.replace("\\'", "'") _escaped = _escaped.replace("'", "''") return f"DEFAULT '{_escaped}'" s = str(column_default) s = s.replace("\\'", "'") s = s.replace("'", "''") return f"DEFAULT '{s}'" @classmethod def _data_type_collation_sequence( cls, collation: str = CollatingSequences.BINARY, column_type: t.Optional[str] = None ) -> str: """Return a SQLite COLLATE clause for textual affinity types. Augmented with sqlglot: if the provided type string does not match the quick textual prefixes, we attempt to transpile it to a SQLite type and then apply SQLite's textual affinity rules (contains CHAR/CLOB/TEXT or their NV*/VAR* variants). This improves handling of MySQL synonyms like CHAR VARYING / CHARACTER VARYING / NATIONAL CHARACTER VARYING. """ if not column_type or collation == CollatingSequences.BINARY: return "" ct: str = column_type.strip() upper: str = ct.upper() # Fast-path for already normalized SQLite textual types if upper.startswith(("CHARACTER", "NCHAR", "NVARCHAR", "TEXT", "VARCHAR")): return f"COLLATE {collation}" # Avoid collations for JSON/BLOB explicitly if "JSON" in upper or "BLOB" in upper: return "" # If the type string obviously denotes text affinity, apply collation if any(tok in upper for tok in ("VARCHAR", "NVARCHAR", "NCHAR", "CHAR", "TEXT", "CLOB", "CHARACTER")): return f"COLLATE {collation}" # Try to map uncommon/synonym types to a SQLite type using sqlglot-based transpiler mapped: t.Optional[str] = cls._transpile_mysql_type_to_sqlite(ct) if mapped: mu = mapped.upper() if ( "CHAR" in mu or "VARCHAR" in mu or "NCHAR" in mu or "NVARCHAR" in mu or "TEXT" in mu or "CLOB" in mu ) and not ("JSON" in mu or "BLOB" in mu): return f"COLLATE {collation}" return "" def _check_sqlite_json1_extension_enabled(self) -> bool: try: self._sqlite_cur.execute("PRAGMA compile_options") return "ENABLE_JSON1" in set(row[0] for row in self._sqlite_cur.fetchall()) except sqlite3.Error: return False def _get_unique_index_name(self, base_name: str) -> str: """Return a unique SQLite index name based on base_name. If base_name has not been used yet, it is returned as-is and recorded. If it has been used, a numeric suffix is appended starting from 2 (e.g., name_2, name_3, ...), and the chosen name is recorded as used. This behavior is only intended for cases where index prefixing is not enabled and SQLite requires global uniqueness for index names. """ if base_name not in self._seen_sqlite_index_names: self._seen_sqlite_index_names.add(base_name) return base_name # Base name already seen — assign next available counter next_num = self._sqlite_index_name_counters.get(base_name, 2) candidate = f"{base_name}_{next_num}" while candidate in self._seen_sqlite_index_names: next_num += 1 candidate = f"{base_name}_{next_num}" # Record chosen candidate and bump counter for the base name self._seen_sqlite_index_names.add(candidate) self._sqlite_index_name_counters[base_name] = next_num + 1 self._logger.info( 'Index "%s" renamed to "%s" to ensure uniqueness across the SQLite database.', base_name, candidate, ) return candidate def _build_create_table_sql(self, table_name: str) -> str: table_ident = self._quote_sqlite_identifier(table_name) sql: str = f"CREATE TABLE IF NOT EXISTS {table_ident} (" primary: str = "" indices: str = "" safe_table = self._escape_mysql_backticks(table_name) self._mysql_cur_dict.execute(f"SHOW COLUMNS FROM `{safe_table}`") rows: t.Sequence[t.Optional[t.Dict[str, RowItemType]]] = self._mysql_cur_dict.fetchall() primary_keys: int = sum(1 for row in rows if row is not None and row["Key"] == "PRI") for row in rows: if row is not None: column_type = self._translate_type_from_mysql_to_sqlite( column_type=row["Type"], # type: ignore[arg-type] sqlite_json1_extension_enabled=self._sqlite_json1_extension_enabled, ) if row["Key"] == "PRI" and row["Extra"] == "auto_increment" and primary_keys == 1: if column_type in Integer_Types: sql += "\n\t{name} INTEGER PRIMARY KEY AUTOINCREMENT,".format( name=self._quote_sqlite_identifier( str( row["Field"].decode() if isinstance(row["Field"], (bytes, bytearray)) else row["Field"] ) ), ) else: self._logger.warning( 'Primary key "%s" in table "%s" is not an INTEGER type! Skipping.', row["Field"], table_name, ) else: sql += "\n\t{name} {type} {notnull} {default} {collation},".format( name=self._quote_sqlite_identifier( str(row["Field"].decode() if isinstance(row["Field"], (bytes, bytearray)) else row["Field"]) ), type=column_type, notnull="NULL" if row["Null"] == "YES" else "NOT NULL", default=self._translate_default_from_mysql_to_sqlite(row["Default"], column_type, row["Extra"]), collation=self._data_type_collation_sequence(self._collation, column_type), ) self._mysql_cur_dict.execute( """ SELECT s.INDEX_NAME AS `name`, IF (NON_UNIQUE = 0 AND s.INDEX_NAME = 'PRIMARY', 1, 0) AS `primary`, IF (NON_UNIQUE = 0 AND s.INDEX_NAME <> 'PRIMARY', 1, 0) AS `unique`, {auto_increment} GROUP_CONCAT(s.COLUMN_NAME ORDER BY SEQ_IN_INDEX) AS `columns`, GROUP_CONCAT(c.COLUMN_TYPE ORDER BY SEQ_IN_INDEX) AS `types` FROM information_schema.STATISTICS AS s JOIN information_schema.COLUMNS AS c ON s.TABLE_SCHEMA = c.TABLE_SCHEMA AND s.TABLE_NAME = c.TABLE_NAME AND s.COLUMN_NAME = c.COLUMN_NAME WHERE s.TABLE_SCHEMA = %s AND s.TABLE_NAME = %s GROUP BY s.INDEX_NAME, s.NON_UNIQUE {group_by_extra} """.format( auto_increment=( "IF (c.EXTRA = 'auto_increment', 1, 0) AS `auto_increment`," if primary_keys == 1 else "0 as `auto_increment`," ), group_by_extra=" ,c.EXTRA" if primary_keys == 1 else "", ), (self._mysql_database, table_name), ) mysql_indices: t.Sequence[t.Optional[t.Dict[str, RowItemType]]] = self._mysql_cur_dict.fetchall() for index in mysql_indices: if index is not None: index_name: str if isinstance(index["name"], bytes): index_name = index["name"].decode() elif isinstance(index["name"], str): index_name = index["name"] else: index_name = str(index["name"]) # check if the index name collides with any table name self._mysql_cur_dict.execute( """ SELECT COUNT(*) AS `count` FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s """, (self._mysql_database, index_name), ) collision: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone() table_collisions: int = 0 if collision is not None: table_collisions = int(collision["count"]) # type: ignore[arg-type] columns: str = "" if isinstance(index["columns"], bytes): columns = index["columns"].decode() elif isinstance(index["columns"], str): columns = index["columns"] types: str = "" if isinstance(index["types"], bytes): types = index["types"].decode() elif isinstance(index["types"], str): types = index["types"] if len(columns) > 0: if index["primary"] in {1, "1"}: if (index["auto_increment"] not in {1, "1"}) or any( self._translate_type_from_mysql_to_sqlite( column_type=_type, sqlite_json1_extension_enabled=self._sqlite_json1_extension_enabled, ) not in Integer_Types for _type in types.split(",") ): primary += "\n\tPRIMARY KEY ({columns})".format( columns=", ".join( self._quote_sqlite_identifier(column.strip()) for column in columns.split(",") ) ) else: # Determine the SQLite index name, considering table name collisions and prefix option proposed_index_name = ( f"{table_name}_{index_name}" if (table_collisions > 0 or self._prefix_indices) else index_name ) # Ensure index name is unique across the whole SQLite database when prefixing is disabled if not self._prefix_indices: unique_index_name = self._get_unique_index_name(proposed_index_name) else: unique_index_name = proposed_index_name unique_kw = "UNIQUE " if index["unique"] in {1, "1"} else "" indices += """CREATE {unique}INDEX IF NOT EXISTS {name} ON {table} ({columns});""".format( unique=unique_kw, name=self._quote_sqlite_identifier(unique_index_name), table=self._quote_sqlite_identifier(table_name), columns=", ".join( self._quote_sqlite_identifier(column.strip()) for column in columns.split(",") ), ) sql += primary sql = sql.rstrip(", ") if not self._without_tables and not self._without_foreign_keys: server_version: t.Optional[t.Tuple[int, ...]] = self._mysql.get_server_version() self._mysql_cur_dict.execute( """ SELECT k.COLUMN_NAME AS `column`, k.REFERENCED_TABLE_NAME AS `ref_table`, k.REFERENCED_COLUMN_NAME AS `ref_column`, c.UPDATE_RULE AS `on_update`, c.DELETE_RULE AS `on_delete` FROM information_schema.TABLE_CONSTRAINTS AS i {JOIN} information_schema.KEY_COLUMN_USAGE AS k ON i.CONSTRAINT_NAME = k.CONSTRAINT_NAME AND i.TABLE_NAME = k.TABLE_NAME {JOIN} information_schema.REFERENTIAL_CONSTRAINTS AS c ON c.CONSTRAINT_NAME = i.CONSTRAINT_NAME AND c.TABLE_NAME = i.TABLE_NAME WHERE i.TABLE_SCHEMA = %s AND i.TABLE_NAME = %s AND i.CONSTRAINT_TYPE = %s GROUP BY i.CONSTRAINT_NAME, k.COLUMN_NAME, k.REFERENCED_TABLE_NAME, k.REFERENCED_COLUMN_NAME, c.UPDATE_RULE, c.DELETE_RULE """.format( JOIN=( "JOIN" if (server_version is not None and server_version[0] == 8 and server_version[2] > 19) else "LEFT JOIN" ) ), (self._mysql_database, table_name, "FOREIGN KEY"), ) for foreign_key in self._mysql_cur_dict.fetchall(): if foreign_key is not None: col = self._quote_sqlite_identifier( foreign_key["column"].decode() if isinstance(foreign_key["column"], (bytes, bytearray)) else str(foreign_key["column"]) # type: ignore[index] ) ref_table = self._quote_sqlite_identifier( foreign_key["ref_table"].decode() if isinstance(foreign_key["ref_table"], (bytes, bytearray)) else str(foreign_key["ref_table"]) # type: ignore[index] ) ref_col = self._quote_sqlite_identifier( foreign_key["ref_column"].decode() if isinstance(foreign_key["ref_column"], (bytes, bytearray)) else str(foreign_key["ref_column"]) # type: ignore[index] ) on_update = str(foreign_key["on_update"] or "NO ACTION").upper() # type: ignore[index] on_delete = str(foreign_key["on_delete"] or "NO ACTION").upper() # type: ignore[index] sql += ( f",\n\tFOREIGN KEY({col}) REFERENCES {ref_table} ({ref_col}) " f"ON UPDATE {on_update} " f"ON DELETE {on_delete}" ) sql += "\n)" if self._sqlite_strict: sql += " STRICT" sql += ";\n" sql += indices return sql def _create_table(self, table_name: str, attempting_reconnect: bool = False) -> None: try: if attempting_reconnect: self._mysql.reconnect() self._sqlite_cur.executescript(self._build_create_table_sql(table_name)) self._sqlite.commit() except mysql.connector.Error as err: if err.errno == errorcode.CR_SERVER_LOST: if not attempting_reconnect: self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.") self._create_table(table_name, True) return else: self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.") raise self._logger.error( "MySQL failed reading table definition from table %s: %s", table_name, err, ) raise except sqlite3.Error as err: self._logger.error("SQLite failed creating table %s: %s", table_name, err) raise def _mysql_viewdef_to_sqlite(self, view_select_sql: str, view_name: str) -> str: """Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement.""" # Normalize whitespace and avoid double semicolons in output cleaned_sql = view_select_sql.strip().rstrip(";") try: tree: Expression = parse_one(cleaned_sql, read="mysql") except (ParseError, ValueError, AttributeError, TypeError): # Fallback: try to remove schema qualifiers if requested, then return stripped_sql = cleaned_sql # Remove qualifiers `schema`.tbl or "schema".tbl or schema.tbl sn: str = re.escape(self._mysql_database) for pat in (rf"`{sn}`\.", rf'"{sn}"\.', rf"\b{sn}\."): stripped_sql = re.sub(pat, "", stripped_sql, flags=re.IGNORECASE) view_ident = self._quote_sqlite_identifier(view_name) return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{stripped_sql};" # Remove schema qualifiers that match schema_name on tables for tbl in tree.find_all(exp.Table): db = tbl.args.get("db") if db and db.name.strip('`"').lower() == self._mysql_database.lower(): tbl.set("db", None) # Also remove schema qualifiers on fully-qualified columns (db.table.column) for col in tree.find_all(exp.Column): db = col.args.get("db") if db and db.name.strip('`"').lower() == self._mysql_database.lower(): col.set("db", None) sqlite_select: str = tree.sql(dialect="sqlite") view_ident = self._quote_sqlite_identifier(view_name) return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{sqlite_select};" def _build_create_view_sql(self, view_name: str) -> str: """Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition.""" # Try to obtain the view definition from information_schema.VIEWS definition: t.Optional[str] = None try: self._mysql_cur_dict.execute( """ SELECT VIEW_DEFINITION AS `definition` FROM information_schema.VIEWS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s """, (self._mysql_database, view_name), ) row: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone() if row is not None and row.get("definition") is not None: val = row["definition"] if isinstance(val, bytes): try: definition = val.decode() except UnicodeDecodeError: definition = str(val) else: definition = t.cast(str, val) except mysql.connector.Error: # Fall back to SHOW CREATE VIEW below definition = None if not definition: # Fallback: use SHOW CREATE VIEW and extract the SELECT part try: # Escape backticks in the MySQL view name for safe interpolation safe_view_name = view_name.replace("`", "``") self._mysql_cur.execute(f"SHOW CREATE VIEW `{safe_view_name}`") res = self._mysql_cur.fetchone() if res and len(res) >= 2: create_stmt = res[1] if isinstance(create_stmt, bytes): try: create_stmt_str = create_stmt.decode() except UnicodeDecodeError: create_stmt_str = str(create_stmt) else: create_stmt_str = t.cast(str, create_stmt) # Extract the SELECT ... part after AS (supporting newlines) m = re.search(r"\bAS\b\s*(.*)$", create_stmt_str, re.IGNORECASE | re.DOTALL) if m: definition = m.group(1).strip().rstrip(";") else: # As a last resort, try to use the full statement replacing the prefix # Not ideal, but better than failing outright idx = create_stmt_str.upper().find(" AS ") if idx != -1: definition = create_stmt_str[idx + 4 :].strip().rstrip(";") except mysql.connector.Error: pass if not definition: raise sqlite3.Error(f"Unable to fetch definition for MySQL view '{view_name}'") return self._mysql_viewdef_to_sqlite( view_name=view_name, view_select_sql=definition, ) def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None: try: if attempting_reconnect: self._mysql.reconnect() sql = self._build_create_view_sql(view_name) self._sqlite_cur.execute(sql) self._sqlite.commit() except mysql.connector.Error as err: if err.errno == errorcode.CR_SERVER_LOST: if not attempting_reconnect: self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.") self._create_view(view_name, True) return else: self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.") raise self._logger.error( "MySQL failed reading view definition from view %s: %s", view_name, err, ) raise except sqlite3.Error as err: self._logger.error("SQLite failed creating view %s: %s", view_name, err) raise def _transfer_table_data( self, table_name: str, sql: str, total_records: int = 0, attempting_reconnect: bool = False ) -> None: if attempting_reconnect: self._mysql.reconnect() try: if self._chunk_size is not None and self._chunk_size > 0: for chunk in trange( self._current_chunk_number, int(ceil(total_records / self._chunk_size)), disable=self._quiet, ): self._current_chunk_number = chunk self._sqlite_cur.executemany( sql, ( tuple(encode_data_for_sqlite(col) if col is not None else None for col in row) for row in self._mysql_cur.fetchmany(self._chunk_size) ), ) else: self._sqlite_cur.executemany( sql, ( tuple(encode_data_for_sqlite(col) if col is not None else None for col in row) for row in tqdm( self._mysql_cur.fetchall(), total=total_records, disable=self._quiet, ) ), ) self._sqlite.commit() except mysql.connector.Error as err: if err.errno == errorcode.CR_SERVER_LOST: if not attempting_reconnect: self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.") self._transfer_table_data( table_name=table_name, sql=sql, total_records=total_records, attempting_reconnect=True, ) return else: self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.") raise self._logger.error( "MySQL transfer failed reading table data from table %s: %s", table_name, err, ) raise except sqlite3.Error as err: self._logger.error( "SQLite transfer failed inserting data into table %s: %s", table_name, err, ) raise
[docs] def transfer(self) -> None: """The primary and only method with which we transfer all the data.""" if len(self._mysql_tables) > 0 or len(self._exclude_mysql_tables) > 0: # transfer only specific tables specific_tables: t.Sequence[str] = ( self._exclude_mysql_tables if len(self._exclude_mysql_tables) > 0 else self._mysql_tables ) self._mysql_cur_prepared.execute( """ SELECT TABLE_NAME, TABLE_TYPE FROM information_schema.TABLES WHERE TABLE_SCHEMA = SCHEMA() AND TABLE_NAME {exclude} IN ({placeholders}) """.format( exclude="NOT" if len(self._exclude_mysql_tables) > 0 else "", placeholders=("%s, " * len(specific_tables)).rstrip(" ,"), ), specific_tables, ) tables: t.Iterable[t.Tuple[str, str]] = ( ( str(row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]), str(row[1].decode() if isinstance(row[1], (bytes, bytearray)) else row[1]), ) for row in self._mysql_cur_prepared.fetchall() ) else: # transfer all tables self._mysql_cur.execute( """ SELECT TABLE_NAME, TABLE_TYPE FROM information_schema.TABLES WHERE TABLE_SCHEMA = SCHEMA() """ ) def _coerce_row(row: t.Any) -> t.Tuple[str, str]: try: # Row like (name, type) name = row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0] ttype = ( row[1].decode() if (isinstance(row, (list, tuple)) and len(row) > 1 and isinstance(row[1], (bytes, bytearray))) else (row[1] if (isinstance(row, (list, tuple)) and len(row) > 1) else "BASE TABLE") ) return str(name), str(ttype) except (TypeError, IndexError, UnicodeDecodeError): # Fallback: treat as a single value name when row is not a 2-tuple or decoding fails name = row.decode() if isinstance(row, (bytes, bytearray)) else str(row) return name, "BASE TABLE" tables = (_coerce_row(row) for row in self._mysql_cur.fetchall()) try: # turn off foreign key checking in SQLite while transferring data self._sqlite_cur.execute("PRAGMA foreign_keys=OFF") for table_name, table_type in tables: if isinstance(table_name, bytes): table_name = table_name.decode() if isinstance(table_type, bytes): table_type = table_type.decode() self._logger.info( "%s%sTransferring table %s", "[WITHOUT DATA] " if self._without_data else "", "[ONLY DATA] " if self._without_tables else "", table_name, ) # reset the chunk self._current_chunk_number = 0 if not self._without_tables: # create the table or view if table_type == "VIEW" and self._views_as_views: self._create_view(table_name) # type: ignore[arg-type] else: self._create_table(table_name) # type: ignore[arg-type] if not self._without_data and not (table_type == "VIEW" and self._views_as_views): # get the size of the data if self._limit_rows > 0: # limit to the requested number of rows safe_table = self._escape_mysql_backticks(table_name) self._mysql_cur_dict.execute( "SELECT COUNT(*) AS `total_records` " f"FROM (SELECT * FROM `{safe_table}` LIMIT {self._limit_rows}) AS `table`" ) else: # get all rows safe_table = self._escape_mysql_backticks(table_name) self._mysql_cur_dict.execute(f"SELECT COUNT(*) AS `total_records` FROM `{safe_table}`") total_records: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone() if total_records is not None: total_records_count: int = int(total_records["total_records"]) # type: ignore[arg-type] else: total_records_count = 0 # only continue if there is anything to transfer if total_records_count > 0: # populate it safe_table = self._escape_mysql_backticks(table_name) self._mysql_cur.execute( "SELECT * FROM `{table_name}` {limit}".format( table_name=safe_table, limit=f"LIMIT {self._limit_rows}" if self._limit_rows > 0 else "", ) ) columns: t.Tuple[str, ...] = tuple(column[0] for column in self._mysql_cur.description) # type: ignore[union-attr] # build the SQL string sql = """ INSERT OR IGNORE INTO "{table}" ({fields}) VALUES ({placeholders}) """.format( table=table_name, fields=('"{}", ' * len(columns)).rstrip(" ,").format(*columns), placeholders=("?, " * len(columns)).rstrip(" ,"), ) self._transfer_table_data( table_name=table_name, # type: ignore[arg-type] sql=sql, total_records=total_records_count, ) except Exception: # pylint: disable=W0706 raise finally: # re-enable foreign key checking once done transferring self._sqlite_cur.execute("PRAGMA foreign_keys=ON") if self._vacuum: self._logger.info("Vacuuming created SQLite database file.\nThis might take a while.") self._sqlite_cur.execute("VACUUM") self._logger.info("Done!")