"""Use to transfer a MySQL database to SQLite."""
import logging
import os
import re
import sqlite3
import typing as t
from datetime import timedelta
from decimal import Decimal
from math import ceil
from os.path import realpath
from sys import stdout
import mysql.connector
from mysql.connector import CharacterSet, errorcode
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.types import RowItemType
from sqlglot import Expression, exp, parse_one
from sqlglot.errors import ParseError
from tqdm import tqdm, trange
try:
# Python 3.11+
from typing import Unpack # type: ignore[attr-defined]
except ImportError:
# Python < 3.11
from typing_extensions import Unpack # type: ignore
from mysql_to_sqlite3.mysql_utils import CHARSET_INTRODUCERS
from mysql_to_sqlite3.sqlite_utils import (
CollatingSequences,
Integer_Types,
adapt_decimal,
adapt_timedelta,
convert_date,
convert_decimal,
convert_timedelta,
encode_data_for_sqlite,
)
from mysql_to_sqlite3.types import MySQLtoSQLiteAttributes, MySQLtoSQLiteParams
[docs]
class MySQLtoSQLite(MySQLtoSQLiteAttributes):
"""Use this class to transfer a MySQL database to SQLite."""
COLUMN_PATTERN: t.Pattern[str] = re.compile(r"^[^(]+")
COLUMN_LENGTH_PATTERN: t.Pattern[str] = re.compile(r"\(\d+\)$")
[docs]
def __init__(self, **kwargs: Unpack[MySQLtoSQLiteParams]) -> None:
"""Constructor."""
if kwargs.get("mysql_database") is not None:
self._mysql_database = str(kwargs.get("mysql_database"))
else:
raise ValueError("Please provide a MySQL database")
if kwargs.get("mysql_user") is not None:
self._mysql_user = str(kwargs.get("mysql_user"))
else:
raise ValueError("Please provide a MySQL user")
if kwargs.get("sqlite_file") is None:
raise ValueError("Please provide an SQLite file")
else:
self._sqlite_file = realpath(str(kwargs.get("sqlite_file")))
password: t.Optional[t.Union[str, bool]] = kwargs.get("mysql_password")
self._mysql_password = password if isinstance(password, str) else None
self._mysql_host = kwargs.get("mysql_host", "localhost") or "localhost"
self._mysql_port = kwargs.get("mysql_port", 3306) or 3306
self._mysql_charset = kwargs.get("mysql_charset", "utf8mb4") or "utf8mb4"
self._mysql_collation = (
kwargs.get("mysql_collation") or CharacterSet().get_default_collation(self._mysql_charset.lower())[0]
)
if not kwargs.get("mysql_collation") and self._mysql_collation == "utf8mb4_0900_ai_ci":
self._mysql_collation = "utf8mb4_unicode_ci"
self._mysql_tables = kwargs.get("mysql_tables") or tuple()
self._exclude_mysql_tables = kwargs.get("exclude_mysql_tables") or tuple()
if bool(self._mysql_tables) and bool(self._exclude_mysql_tables):
raise ValueError("mysql_tables and exclude_mysql_tables are mutually exclusive")
self._limit_rows = kwargs.get("limit_rows", 0) or 0
if kwargs.get("collation") is not None and str(kwargs.get("collation")).upper() in {
CollatingSequences.BINARY,
CollatingSequences.NOCASE,
CollatingSequences.RTRIM,
}:
self._collation = str(kwargs.get("collation")).upper()
else:
self._collation = CollatingSequences.BINARY
self._prefix_indices = kwargs.get("prefix_indices", False) or False
if bool(self._mysql_tables) or bool(self._exclude_mysql_tables):
self._without_foreign_keys = True
else:
self._without_foreign_keys = bool(kwargs.get("without_foreign_keys", False))
self._without_data = bool(kwargs.get("without_data", False))
self._without_tables = bool(kwargs.get("without_tables", False))
if self._without_tables and self._without_data:
raise ValueError("Unable to continue without transferring data or creating tables!")
self._mysql_ssl_disabled = bool(kwargs.get("mysql_ssl_disabled", False))
self._current_chunk_number = 0
self._chunk_size = kwargs.get("chunk") or None
self._buffered = bool(kwargs.get("buffered", False))
self._vacuum = bool(kwargs.get("vacuum", False))
self._quiet = bool(kwargs.get("quiet", False))
self._views_as_views = bool(kwargs.get("views_as_views", True))
self._sqlite_strict = bool(kwargs.get("sqlite_strict", False))
self._logger = self._setup_logger(log_file=kwargs.get("log_file") or None, quiet=self._quiet)
if self._sqlite_strict and sqlite3.sqlite_version < "3.37.0":
self._logger.warning(
"SQLite version %s does not support STRICT tables. Tables will be created without strict mode.",
sqlite3.sqlite_version,
)
self._sqlite_strict = False
sqlite3.register_adapter(Decimal, adapt_decimal)
sqlite3.register_converter("DECIMAL", convert_decimal)
sqlite3.register_adapter(timedelta, adapt_timedelta)
sqlite3.register_converter("DATE", convert_date)
sqlite3.register_converter("TIME", convert_timedelta)
self._sqlite = sqlite3.connect(realpath(self._sqlite_file), detect_types=sqlite3.PARSE_DECLTYPES)
self._sqlite.row_factory = sqlite3.Row
self._sqlite_cur = self._sqlite.cursor()
self._json_as_text = bool(kwargs.get("json_as_text", False))
self._sqlite_json1_extension_enabled = not self._json_as_text and self._check_sqlite_json1_extension_enabled()
# Track seen SQLite index names to generate unique names when prefixing is disabled
self._seen_sqlite_index_names: t.Set[str] = set()
# Counter for duplicate index names to assign numeric suffixes (name_2, name_3, ...)
self._sqlite_index_name_counters: t.Dict[str, int] = {}
try:
_mysql_connection = mysql.connector.connect(
user=self._mysql_user,
password=self._mysql_password,
host=self._mysql_host,
port=self._mysql_port,
ssl_disabled=self._mysql_ssl_disabled,
charset=self._mysql_charset,
collation=self._mysql_collation,
)
if isinstance(_mysql_connection, MySQLConnectionAbstract):
self._mysql = _mysql_connection
else:
raise ConnectionError("Unable to connect to MySQL")
if not self._mysql.is_connected():
raise ConnectionError("Unable to connect to MySQL")
self._mysql_cur = self._mysql.cursor(buffered=self._buffered, raw=True) # type: ignore[assignment]
self._mysql_cur_prepared = self._mysql.cursor(prepared=True) # type: ignore[assignment]
self._mysql_cur_dict = self._mysql.cursor( # type: ignore[assignment]
buffered=self._buffered,
dictionary=True,
)
try:
self._mysql.database = self._mysql_database
except (mysql.connector.Error, Exception) as err:
if hasattr(err, "errno") and err.errno == errorcode.ER_BAD_DB_ERROR:
self._logger.error("MySQL Database does not exist!")
raise
self._logger.error(err)
raise
except mysql.connector.Error as err:
self._logger.error(err)
raise
@classmethod
def _setup_logger(
cls, log_file: t.Optional[t.Union[str, "os.PathLike[t.Any]"]] = None, quiet: bool = False
) -> logging.Logger:
formatter: logging.Formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
logger: logging.Logger = logging.getLogger(cls.__name__)
logger.setLevel(logging.DEBUG)
if not quiet:
screen_handler = logging.StreamHandler(stream=stdout)
screen_handler.setFormatter(formatter)
logger.addHandler(screen_handler)
if log_file:
file_handler = logging.FileHandler(realpath(log_file), mode="w")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
@classmethod
def _valid_column_type(cls, column_type: str) -> t.Optional[t.Match[str]]:
return cls.COLUMN_PATTERN.match(column_type.strip())
@classmethod
def _column_type_length(cls, column_type: str) -> str:
suffix: t.Optional[t.Match[str]] = cls.COLUMN_LENGTH_PATTERN.search(column_type)
if suffix:
return suffix.group(0)
return ""
@staticmethod
def _decode_column_type(column_type: t.Union[str, bytes]) -> str:
if isinstance(column_type, str):
return column_type
if isinstance(column_type, bytes):
try:
return column_type.decode()
except (UnicodeDecodeError, AttributeError):
pass
return str(column_type)
@classmethod
def _translate_type_from_mysql_to_sqlite(
cls, column_type: t.Union[str, bytes], sqlite_json1_extension_enabled=False
) -> str:
_column_type: str = cls._decode_column_type(column_type)
# This could be optimized even further, however is seems adequate.
match: t.Optional[t.Match[str]] = cls._valid_column_type(_column_type)
if not match:
raise ValueError(f'"{_column_type}" is not a valid column_type!')
data_type: str = match.group(0).upper()
if data_type.endswith(" UNSIGNED"):
data_type = data_type.replace(" UNSIGNED", "")
if data_type in {
"BIGINT",
"BLOB",
"BOOLEAN",
"DATE",
"DATETIME",
"DECIMAL",
"DOUBLE",
"FLOAT",
"INTEGER",
"MEDIUMINT",
"NUMERIC",
"REAL",
"SMALLINT",
"TIME",
"TINYINT",
"YEAR",
}:
return data_type
if data_type == "DOUBLE PRECISION":
return "DOUBLE"
if data_type == "FIXED":
return "DECIMAL"
if data_type in {"CHARACTER VARYING", "CHAR VARYING"}:
return "VARCHAR" + cls._column_type_length(_column_type)
if data_type in {"NATIONAL CHARACTER VARYING", "NATIONAL CHAR VARYING", "NATIONAL VARCHAR"}:
return "NVARCHAR" + cls._column_type_length(_column_type)
if data_type == "NATIONAL CHARACTER":
return "NCHAR" + cls._column_type_length(_column_type)
if data_type in {
"BIT",
"BINARY",
"LONGBLOB",
"MEDIUMBLOB",
"TINYBLOB",
"VARBINARY",
}:
return "BLOB"
if data_type in {"NCHAR", "NVARCHAR", "VARCHAR"}:
return data_type + cls._column_type_length(_column_type)
if data_type == "CHAR":
return "CHARACTER" + cls._column_type_length(_column_type)
if data_type == "INT":
return "INTEGER"
if data_type in "TIMESTAMP":
return "DATETIME"
if data_type == "JSON" and sqlite_json1_extension_enabled:
return "JSON"
# As a last resort, try sqlglot to derive a better SQLite-compatible type
sqlglot_type: t.Optional[str] = cls._transpile_mysql_type_to_sqlite(
_column_type, sqlite_json1_extension_enabled=sqlite_json1_extension_enabled
)
if sqlglot_type:
return sqlglot_type
return "TEXT"
@classmethod
def _transpile_mysql_expr_to_sqlite(cls, expr_sql: str) -> t.Optional[str]:
"""Transpile a MySQL scalar expression to SQLite using sqlglot.
Returns the SQLite SQL string on success, or None on failure.
"""
cleaned: str = expr_sql.strip().rstrip(";")
try:
tree: Expression = parse_one(cleaned, read="mysql")
return tree.sql(dialect="sqlite")
except (ParseError, ValueError):
return None
except (AttributeError, TypeError): # pragma: no cover - unexpected sqlglot failure
logging.getLogger(cls.__name__ if hasattr(cls, "__name__") else "MySQLtoSQLite").debug(
"sqlglot failed to transpile expr: %r", expr_sql
)
return None
@classmethod
def _normalize_literal_with_sqlglot(cls, expr_sql: str) -> t.Optional[str]:
"""Normalize a MySQL literal using sqlglot, returning SQLite SQL if literal-like."""
cleaned: str = expr_sql.strip().rstrip(";")
try:
node: Expression = parse_one(cleaned, read="mysql")
except (ParseError, ValueError):
return None
if isinstance(node, exp.Literal):
return node.sql(dialect="sqlite")
if isinstance(node, exp.Paren) and isinstance(node.this, exp.Literal):
return node.this.sql(dialect="sqlite")
return None
@staticmethod
def _quote_sqlite_identifier(name: t.Union[str, bytes, bytearray]) -> str:
"""Safely quote an identifier for SQLite using sqlglot.
Always returns a double-quoted identifier, doubling any embedded quotes.
Accepts bytes and decodes them to str when needed.
"""
if isinstance(name, (bytes, bytearray)):
try:
s: str = name.decode()
except (UnicodeDecodeError, AttributeError):
s = str(name)
else:
s = str(name)
try:
# Normalize identifier using sqlglot, then wrap in quotes regardless
normalized: str = exp.to_identifier(s).name
except (AttributeError, ValueError, TypeError): # pragma: no cover - extremely unlikely
normalized = s
replaced: str = normalized.replace('"', '""')
return f'"{replaced}"'
@staticmethod
def _escape_mysql_backticks(identifier: str) -> str:
"""Escape backticks in a MySQL identifier for safe backtick quoting."""
return identifier.replace("`", "``")
@classmethod
def _transpile_mysql_type_to_sqlite(
cls, column_type: str, sqlite_json1_extension_enabled: bool = False
) -> t.Optional[str]:
"""Attempt to derive a suitable SQLite column type using sqlglot.
This is used as a last-resort fallback when the built-in mapper does not
recognize a MySQL type or synonym. It keeps existing behavior for known
types and only augments unknowns.
"""
# Wrap the type in a CAST expression so sqlglot can parse it consistently.
expr_sql: str = f"CAST(NULL AS {column_type.strip()})"
try:
tree: Expression = parse_one(expr_sql, read="mysql")
rendered: str = tree.sql(dialect="sqlite")
except (ParseError, ValueError, AttributeError, TypeError):
return None
# Extract the type inside CAST(NULL AS ...)
m: t.Optional[t.Match[str]] = re.search(r"CAST\(NULL AS\s+([^)]+)\)", rendered, re.IGNORECASE)
if not m:
return None
extracted: str = m.group(1).strip()
upper: str = extracted.upper()
# JSON handling: respect availability of JSON1 extension
if "JSON" in upper:
return "JSON" if sqlite_json1_extension_enabled else "TEXT"
# Split out optional length suffix like (255) or (10,2)
base: str = upper
length_suffix: str = ""
paren: t.Optional[t.Match[str]] = re.match(r"^([A-Z ]+)(\(.*\))$", upper)
if paren:
base = paren.group(1).strip()
length_suffix = paren.group(2)
# Minimal synonym normalization
synonyms: t.Dict[str, str] = {
"DOUBLE PRECISION": "DOUBLE",
"FIXED": "DECIMAL",
"CHAR VARYING": "VARCHAR",
"CHARACTER VARYING": "VARCHAR",
"NATIONAL VARCHAR": "NVARCHAR",
"NATIONAL CHARACTER VARYING": "NVARCHAR",
"NATIONAL CHAR VARYING": "NVARCHAR",
"NATIONAL CHARACTER": "NCHAR",
}
base = synonyms.get(base, base)
# Decide the final SQLite type, aligning with existing conventions
if base in {"NCHAR", "NVARCHAR", "VARCHAR"} and length_suffix:
return f"{base}{length_suffix}"
if base in {"CHAR", "CHARACTER"}:
return f"CHARACTER{length_suffix}"
if base in {"DECIMAL", "NUMERIC"}:
# Keep without length to match the existing mapper's style
return base
if base in {
"DOUBLE",
"REAL",
"FLOAT",
"INTEGER",
"BIGINT",
"SMALLINT",
"MEDIUMINT",
"TINYINT",
"BLOB",
"DATE",
"DATETIME",
"TIME",
"YEAR",
"BOOLEAN",
}:
return base
if base in {"VARBINARY", "BINARY", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB"}:
return "BLOB"
if base in {"TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "CLOB"}:
return "TEXT"
# ENUM/SET and other complex types -> keep default behavior (TEXT)
if base in {"ENUM", "SET"}:
return "TEXT"
# If we reached here, we didn't find a better mapping
return None
@classmethod
def _translate_default_from_mysql_to_sqlite(
cls,
column_default: RowItemType = None,
column_type: t.Optional[str] = None,
column_extra: RowItemType = None,
) -> str:
is_binary: bool
is_hex: bool
if isinstance(column_default, bytes):
if column_type in {
"BIT",
"BINARY",
"BLOB",
"LONGBLOB",
"MEDIUMBLOB",
"TINYBLOB",
"VARBINARY",
}:
if column_extra in {"DEFAULT_GENERATED", "default_generated"}:
for charset_introducer in CHARSET_INTRODUCERS:
if column_default.startswith(charset_introducer.encode()):
is_binary = False
is_hex = False
for b_prefix in ("B", "b"):
if column_default.startswith(rf"{charset_introducer} {b_prefix}\'".encode()):
is_binary = True
break
for x_prefix in ("X", "x"):
if column_default.startswith(rf"{charset_introducer} {x_prefix}\'".encode()):
is_hex = True
break
column_default = (
column_default.replace(charset_introducer.encode(), b"")
.replace(rb"x\'", b"")
.replace(rb"X\'", b"")
.replace(rb"b\'", b"")
.replace(rb"B\'", b"")
.replace(rb"\'", b"")
.replace(rb"'", b"")
.strip()
)
if is_binary:
return f"DEFAULT '{chr(int(column_default, 2))}'"
if is_hex:
return f"DEFAULT x'{column_default.decode()}'"
break
return f"DEFAULT x'{column_default.hex()}'"
try:
column_default = column_default.decode()
except (UnicodeDecodeError, AttributeError):
pass
if column_default is None:
return ""
if isinstance(column_default, bool):
if column_type == "BOOLEAN" and sqlite3.sqlite_version >= "3.23.0":
if column_default:
return "DEFAULT(TRUE)"
return "DEFAULT(FALSE)"
return f"DEFAULT '{int(column_default)}'"
if isinstance(column_default, str):
if column_default.lower() == "curtime()":
return "DEFAULT CURRENT_TIME"
if column_default.lower() == "curdate()":
return "DEFAULT CURRENT_DATE"
if column_default.lower() in {"current_timestamp()", "now()"}:
return "DEFAULT CURRENT_TIMESTAMP"
if column_extra in {"DEFAULT_GENERATED", "default_generated"}:
if column_default.upper() in {
"CURRENT_TIME",
"CURRENT_DATE",
"CURRENT_TIMESTAMP",
}:
return f"DEFAULT {column_default.upper()}"
for charset_introducer in CHARSET_INTRODUCERS:
if column_default.startswith(charset_introducer):
is_binary = False
is_hex = False
for b_prefix in ("B", "b"):
if column_default.startswith(rf"{charset_introducer} {b_prefix}\'"):
is_binary = True
break
for x_prefix in ("X", "x"):
if column_default.startswith(rf"{charset_introducer} {x_prefix}\'"):
is_hex = True
break
column_default = (
column_default.replace(charset_introducer, "")
.replace(r"x\'", "")
.replace(r"X\'", "")
.replace(r"b\'", "")
.replace(r"B\'", "")
.replace(r"\'", "")
.replace(r"'", "")
.strip()
)
if is_binary:
return f"DEFAULT '{chr(int(column_default, 2))}'"
if is_hex:
return f"DEFAULT x'{column_default}'"
return f"DEFAULT '{column_default}'"
transpiled: t.Optional[str] = cls._transpile_mysql_expr_to_sqlite(column_default)
if transpiled:
norm: str = transpiled.strip().rstrip(";")
upper: str = norm.upper()
if upper in {"CURRENT_TIME", "CURRENT_DATE", "CURRENT_TIMESTAMP"}:
return f"DEFAULT {upper}"
if upper == "NULL":
return "DEFAULT NULL"
# Allow blob hex literal X'..'
if re.match(r"^[Xx]'[0-9A-Fa-f]+'$", norm):
return f"DEFAULT {norm}"
# Support boolean tokens when provided as generated strings
if upper in {"TRUE", "FALSE"}:
if column_type == "BOOLEAN" and sqlite3.sqlite_version >= "3.23.0":
return f"DEFAULT({upper})"
return f"DEFAULT '{1 if upper == 'TRUE' else 0}'"
# Unwrap a single layer of parenthesis around a literal
if norm.startswith("(") and norm.endswith(")"):
inner = norm[1:-1].strip()
if (inner.startswith("'") and inner.endswith("'")) or re.match(r"^-?\d+(?:\.\d+)?$", inner):
return f"DEFAULT {inner}"
# If the expression is arithmetic-only over numeric literals, allow as-is
if re.match(r"^[\d\.\s\+\-\*/\(\)]+$", norm) and any(ch.isdigit() for ch in norm):
return f"DEFAULT {norm}"
# Allow numeric or single-quoted string literals as-is
if (norm.startswith("'") and norm.endswith("'")) or re.match(r"^-?\d+(?:\.\d+)?$", norm):
return f"DEFAULT {norm}"
# Allow simple arithmetic constant expressions composed of numbers and + - * /
if re.match(r"^[\d\.\s\+\-\*/\(\)]+$", norm) and any(ch.isdigit() for ch in norm):
return f"DEFAULT {norm}"
stripped_default = column_default.strip()
if stripped_default.startswith("'") or (
stripped_default.startswith("(") and stripped_default.endswith(")")
):
normalized_literal: t.Optional[str] = cls._normalize_literal_with_sqlglot(column_default)
if normalized_literal is not None:
return f"DEFAULT {normalized_literal}"
# Fallback: robustly escape single quotes for plain string defaults
_escaped = column_default.replace("\\'", "'")
_escaped = _escaped.replace("'", "''")
return f"DEFAULT '{_escaped}'"
s = str(column_default)
s = s.replace("\\'", "'")
s = s.replace("'", "''")
return f"DEFAULT '{s}'"
@classmethod
def _data_type_collation_sequence(
cls, collation: str = CollatingSequences.BINARY, column_type: t.Optional[str] = None
) -> str:
"""Return a SQLite COLLATE clause for textual affinity types.
Augmented with sqlglot: if the provided type string does not match the
quick textual prefixes, we attempt to transpile it to a SQLite type and
then apply SQLite's textual affinity rules (contains CHAR/CLOB/TEXT or
their NV*/VAR* variants). This improves handling of MySQL synonyms like
CHAR VARYING / CHARACTER VARYING / NATIONAL CHARACTER VARYING.
"""
if not column_type or collation == CollatingSequences.BINARY:
return ""
ct: str = column_type.strip()
upper: str = ct.upper()
# Fast-path for already normalized SQLite textual types
if upper.startswith(("CHARACTER", "NCHAR", "NVARCHAR", "TEXT", "VARCHAR")):
return f"COLLATE {collation}"
# Avoid collations for JSON/BLOB explicitly
if "JSON" in upper or "BLOB" in upper:
return ""
# If the type string obviously denotes text affinity, apply collation
if any(tok in upper for tok in ("VARCHAR", "NVARCHAR", "NCHAR", "CHAR", "TEXT", "CLOB", "CHARACTER")):
return f"COLLATE {collation}"
# Try to map uncommon/synonym types to a SQLite type using sqlglot-based transpiler
mapped: t.Optional[str] = cls._transpile_mysql_type_to_sqlite(ct)
if mapped:
mu = mapped.upper()
if (
"CHAR" in mu or "VARCHAR" in mu or "NCHAR" in mu or "NVARCHAR" in mu or "TEXT" in mu or "CLOB" in mu
) and not ("JSON" in mu or "BLOB" in mu):
return f"COLLATE {collation}"
return ""
def _check_sqlite_json1_extension_enabled(self) -> bool:
try:
self._sqlite_cur.execute("PRAGMA compile_options")
return "ENABLE_JSON1" in set(row[0] for row in self._sqlite_cur.fetchall())
except sqlite3.Error:
return False
def _get_unique_index_name(self, base_name: str) -> str:
"""Return a unique SQLite index name based on base_name.
If base_name has not been used yet, it is returned as-is and recorded. If it has been
used, a numeric suffix is appended starting from 2 (e.g., name_2, name_3, ...), and the
chosen name is recorded as used. This behavior is only intended for cases where index
prefixing is not enabled and SQLite requires global uniqueness for index names.
"""
if base_name not in self._seen_sqlite_index_names:
self._seen_sqlite_index_names.add(base_name)
return base_name
# Base name already seen — assign next available counter
next_num = self._sqlite_index_name_counters.get(base_name, 2)
candidate = f"{base_name}_{next_num}"
while candidate in self._seen_sqlite_index_names:
next_num += 1
candidate = f"{base_name}_{next_num}"
# Record chosen candidate and bump counter for the base name
self._seen_sqlite_index_names.add(candidate)
self._sqlite_index_name_counters[base_name] = next_num + 1
self._logger.info(
'Index "%s" renamed to "%s" to ensure uniqueness across the SQLite database.',
base_name,
candidate,
)
return candidate
def _build_create_table_sql(self, table_name: str) -> str:
table_ident = self._quote_sqlite_identifier(table_name)
sql: str = f"CREATE TABLE IF NOT EXISTS {table_ident} ("
primary: str = ""
indices: str = ""
safe_table = self._escape_mysql_backticks(table_name)
self._mysql_cur_dict.execute(f"SHOW COLUMNS FROM `{safe_table}`")
rows: t.Sequence[t.Optional[t.Dict[str, RowItemType]]] = self._mysql_cur_dict.fetchall()
primary_keys: int = sum(1 for row in rows if row is not None and row["Key"] == "PRI")
for row in rows:
if row is not None:
column_type = self._translate_type_from_mysql_to_sqlite(
column_type=row["Type"], # type: ignore[arg-type]
sqlite_json1_extension_enabled=self._sqlite_json1_extension_enabled,
)
if row["Key"] == "PRI" and row["Extra"] == "auto_increment" and primary_keys == 1:
if column_type in Integer_Types:
sql += "\n\t{name} INTEGER PRIMARY KEY AUTOINCREMENT,".format(
name=self._quote_sqlite_identifier(
str(
row["Field"].decode()
if isinstance(row["Field"], (bytes, bytearray))
else row["Field"]
)
),
)
else:
self._logger.warning(
'Primary key "%s" in table "%s" is not an INTEGER type! Skipping.',
row["Field"],
table_name,
)
else:
sql += "\n\t{name} {type} {notnull} {default} {collation},".format(
name=self._quote_sqlite_identifier(
str(row["Field"].decode() if isinstance(row["Field"], (bytes, bytearray)) else row["Field"])
),
type=column_type,
notnull="NULL" if row["Null"] == "YES" else "NOT NULL",
default=self._translate_default_from_mysql_to_sqlite(row["Default"], column_type, row["Extra"]),
collation=self._data_type_collation_sequence(self._collation, column_type),
)
self._mysql_cur_dict.execute(
"""
SELECT s.INDEX_NAME AS `name`,
IF (NON_UNIQUE = 0 AND s.INDEX_NAME = 'PRIMARY', 1, 0) AS `primary`,
IF (NON_UNIQUE = 0 AND s.INDEX_NAME <> 'PRIMARY', 1, 0) AS `unique`,
{auto_increment}
GROUP_CONCAT(s.COLUMN_NAME ORDER BY SEQ_IN_INDEX) AS `columns`,
GROUP_CONCAT(c.COLUMN_TYPE ORDER BY SEQ_IN_INDEX) AS `types`
FROM information_schema.STATISTICS AS s
JOIN information_schema.COLUMNS AS c
ON s.TABLE_SCHEMA = c.TABLE_SCHEMA
AND s.TABLE_NAME = c.TABLE_NAME
AND s.COLUMN_NAME = c.COLUMN_NAME
WHERE s.TABLE_SCHEMA = %s
AND s.TABLE_NAME = %s
GROUP BY s.INDEX_NAME, s.NON_UNIQUE {group_by_extra}
""".format(
auto_increment=(
"IF (c.EXTRA = 'auto_increment', 1, 0) AS `auto_increment`,"
if primary_keys == 1
else "0 as `auto_increment`,"
),
group_by_extra=" ,c.EXTRA" if primary_keys == 1 else "",
),
(self._mysql_database, table_name),
)
mysql_indices: t.Sequence[t.Optional[t.Dict[str, RowItemType]]] = self._mysql_cur_dict.fetchall()
for index in mysql_indices:
if index is not None:
index_name: str
if isinstance(index["name"], bytes):
index_name = index["name"].decode()
elif isinstance(index["name"], str):
index_name = index["name"]
else:
index_name = str(index["name"])
# check if the index name collides with any table name
self._mysql_cur_dict.execute(
"""
SELECT COUNT(*) AS `count`
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
""",
(self._mysql_database, index_name),
)
collision: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone()
table_collisions: int = 0
if collision is not None:
table_collisions = int(collision["count"]) # type: ignore[arg-type]
columns: str = ""
if isinstance(index["columns"], bytes):
columns = index["columns"].decode()
elif isinstance(index["columns"], str):
columns = index["columns"]
types: str = ""
if isinstance(index["types"], bytes):
types = index["types"].decode()
elif isinstance(index["types"], str):
types = index["types"]
if len(columns) > 0:
if index["primary"] in {1, "1"}:
if (index["auto_increment"] not in {1, "1"}) or any(
self._translate_type_from_mysql_to_sqlite(
column_type=_type,
sqlite_json1_extension_enabled=self._sqlite_json1_extension_enabled,
)
not in Integer_Types
for _type in types.split(",")
):
primary += "\n\tPRIMARY KEY ({columns})".format(
columns=", ".join(
self._quote_sqlite_identifier(column.strip()) for column in columns.split(",")
)
)
else:
# Determine the SQLite index name, considering table name collisions and prefix option
proposed_index_name = (
f"{table_name}_{index_name}"
if (table_collisions > 0 or self._prefix_indices)
else index_name
)
# Ensure index name is unique across the whole SQLite database when prefixing is disabled
if not self._prefix_indices:
unique_index_name = self._get_unique_index_name(proposed_index_name)
else:
unique_index_name = proposed_index_name
unique_kw = "UNIQUE " if index["unique"] in {1, "1"} else ""
indices += """CREATE {unique}INDEX IF NOT EXISTS {name} ON {table} ({columns});""".format(
unique=unique_kw,
name=self._quote_sqlite_identifier(unique_index_name),
table=self._quote_sqlite_identifier(table_name),
columns=", ".join(
self._quote_sqlite_identifier(column.strip()) for column in columns.split(",")
),
)
sql += primary
sql = sql.rstrip(", ")
if not self._without_tables and not self._without_foreign_keys:
server_version: t.Optional[t.Tuple[int, ...]] = self._mysql.get_server_version()
self._mysql_cur_dict.execute(
"""
SELECT k.COLUMN_NAME AS `column`,
k.REFERENCED_TABLE_NAME AS `ref_table`,
k.REFERENCED_COLUMN_NAME AS `ref_column`,
c.UPDATE_RULE AS `on_update`,
c.DELETE_RULE AS `on_delete`
FROM information_schema.TABLE_CONSTRAINTS AS i
{JOIN} information_schema.KEY_COLUMN_USAGE AS k
ON i.CONSTRAINT_NAME = k.CONSTRAINT_NAME
AND i.TABLE_NAME = k.TABLE_NAME
{JOIN} information_schema.REFERENTIAL_CONSTRAINTS AS c
ON c.CONSTRAINT_NAME = i.CONSTRAINT_NAME
AND c.TABLE_NAME = i.TABLE_NAME
WHERE i.TABLE_SCHEMA = %s
AND i.TABLE_NAME = %s
AND i.CONSTRAINT_TYPE = %s
GROUP BY i.CONSTRAINT_NAME,
k.COLUMN_NAME,
k.REFERENCED_TABLE_NAME,
k.REFERENCED_COLUMN_NAME,
c.UPDATE_RULE,
c.DELETE_RULE
""".format(
JOIN=(
"JOIN"
if (server_version is not None and server_version[0] == 8 and server_version[2] > 19)
else "LEFT JOIN"
)
),
(self._mysql_database, table_name, "FOREIGN KEY"),
)
for foreign_key in self._mysql_cur_dict.fetchall():
if foreign_key is not None:
col = self._quote_sqlite_identifier(
foreign_key["column"].decode()
if isinstance(foreign_key["column"], (bytes, bytearray))
else str(foreign_key["column"]) # type: ignore[index]
)
ref_table = self._quote_sqlite_identifier(
foreign_key["ref_table"].decode()
if isinstance(foreign_key["ref_table"], (bytes, bytearray))
else str(foreign_key["ref_table"]) # type: ignore[index]
)
ref_col = self._quote_sqlite_identifier(
foreign_key["ref_column"].decode()
if isinstance(foreign_key["ref_column"], (bytes, bytearray))
else str(foreign_key["ref_column"]) # type: ignore[index]
)
on_update = str(foreign_key["on_update"] or "NO ACTION").upper() # type: ignore[index]
on_delete = str(foreign_key["on_delete"] or "NO ACTION").upper() # type: ignore[index]
sql += (
f",\n\tFOREIGN KEY({col}) REFERENCES {ref_table} ({ref_col}) "
f"ON UPDATE {on_update} "
f"ON DELETE {on_delete}"
)
sql += "\n)"
if self._sqlite_strict:
sql += " STRICT"
sql += ";\n"
sql += indices
return sql
def _create_table(self, table_name: str, attempting_reconnect: bool = False) -> None:
try:
if attempting_reconnect:
self._mysql.reconnect()
self._sqlite_cur.executescript(self._build_create_table_sql(table_name))
self._sqlite.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.CR_SERVER_LOST:
if not attempting_reconnect:
self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.")
self._create_table(table_name, True)
return
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
self._logger.error(
"MySQL failed reading table definition from table %s: %s",
table_name,
err,
)
raise
except sqlite3.Error as err:
self._logger.error("SQLite failed creating table %s: %s", table_name, err)
raise
def _mysql_viewdef_to_sqlite(self, view_select_sql: str, view_name: str) -> str:
"""Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement."""
# Normalize whitespace and avoid double semicolons in output
cleaned_sql = view_select_sql.strip().rstrip(";")
try:
tree: Expression = parse_one(cleaned_sql, read="mysql")
except (ParseError, ValueError, AttributeError, TypeError):
# Fallback: try to remove schema qualifiers if requested, then return
stripped_sql = cleaned_sql
# Remove qualifiers `schema`.tbl or "schema".tbl or schema.tbl
sn: str = re.escape(self._mysql_database)
for pat in (rf"`{sn}`\.", rf'"{sn}"\.', rf"\b{sn}\."):
stripped_sql = re.sub(pat, "", stripped_sql, flags=re.IGNORECASE)
view_ident = self._quote_sqlite_identifier(view_name)
return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{stripped_sql};"
# Remove schema qualifiers that match schema_name on tables
for tbl in tree.find_all(exp.Table):
db = tbl.args.get("db")
if db and db.name.strip('`"').lower() == self._mysql_database.lower():
tbl.set("db", None)
# Also remove schema qualifiers on fully-qualified columns (db.table.column)
for col in tree.find_all(exp.Column):
db = col.args.get("db")
if db and db.name.strip('`"').lower() == self._mysql_database.lower():
col.set("db", None)
sqlite_select: str = tree.sql(dialect="sqlite")
view_ident = self._quote_sqlite_identifier(view_name)
return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{sqlite_select};"
def _build_create_view_sql(self, view_name: str) -> str:
"""Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition."""
# Try to obtain the view definition from information_schema.VIEWS
definition: t.Optional[str] = None
try:
self._mysql_cur_dict.execute(
"""
SELECT VIEW_DEFINITION AS `definition`
FROM information_schema.VIEWS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
""",
(self._mysql_database, view_name),
)
row: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone()
if row is not None and row.get("definition") is not None:
val = row["definition"]
if isinstance(val, bytes):
try:
definition = val.decode()
except UnicodeDecodeError:
definition = str(val)
else:
definition = t.cast(str, val)
except mysql.connector.Error:
# Fall back to SHOW CREATE VIEW below
definition = None
if not definition:
# Fallback: use SHOW CREATE VIEW and extract the SELECT part
try:
# Escape backticks in the MySQL view name for safe interpolation
safe_view_name = view_name.replace("`", "``")
self._mysql_cur.execute(f"SHOW CREATE VIEW `{safe_view_name}`")
res = self._mysql_cur.fetchone()
if res and len(res) >= 2:
create_stmt = res[1]
if isinstance(create_stmt, bytes):
try:
create_stmt_str = create_stmt.decode()
except UnicodeDecodeError:
create_stmt_str = str(create_stmt)
else:
create_stmt_str = t.cast(str, create_stmt)
# Extract the SELECT ... part after AS (supporting newlines)
m = re.search(r"\bAS\b\s*(.*)$", create_stmt_str, re.IGNORECASE | re.DOTALL)
if m:
definition = m.group(1).strip().rstrip(";")
else:
# As a last resort, try to use the full statement replacing the prefix
# Not ideal, but better than failing outright
idx = create_stmt_str.upper().find(" AS ")
if idx != -1:
definition = create_stmt_str[idx + 4 :].strip().rstrip(";")
except mysql.connector.Error:
pass
if not definition:
raise sqlite3.Error(f"Unable to fetch definition for MySQL view '{view_name}'")
return self._mysql_viewdef_to_sqlite(
view_name=view_name,
view_select_sql=definition,
)
def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None:
try:
if attempting_reconnect:
self._mysql.reconnect()
sql = self._build_create_view_sql(view_name)
self._sqlite_cur.execute(sql)
self._sqlite.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.CR_SERVER_LOST:
if not attempting_reconnect:
self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.")
self._create_view(view_name, True)
return
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
self._logger.error(
"MySQL failed reading view definition from view %s: %s",
view_name,
err,
)
raise
except sqlite3.Error as err:
self._logger.error("SQLite failed creating view %s: %s", view_name, err)
raise
def _transfer_table_data(
self, table_name: str, sql: str, total_records: int = 0, attempting_reconnect: bool = False
) -> None:
if attempting_reconnect:
self._mysql.reconnect()
try:
if self._chunk_size is not None and self._chunk_size > 0:
for chunk in trange(
self._current_chunk_number,
int(ceil(total_records / self._chunk_size)),
disable=self._quiet,
):
self._current_chunk_number = chunk
self._sqlite_cur.executemany(
sql,
(
tuple(encode_data_for_sqlite(col) if col is not None else None for col in row)
for row in self._mysql_cur.fetchmany(self._chunk_size)
),
)
else:
self._sqlite_cur.executemany(
sql,
(
tuple(encode_data_for_sqlite(col) if col is not None else None for col in row)
for row in tqdm(
self._mysql_cur.fetchall(),
total=total_records,
disable=self._quiet,
)
),
)
self._sqlite.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.CR_SERVER_LOST:
if not attempting_reconnect:
self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.")
self._transfer_table_data(
table_name=table_name,
sql=sql,
total_records=total_records,
attempting_reconnect=True,
)
return
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
self._logger.error(
"MySQL transfer failed reading table data from table %s: %s",
table_name,
err,
)
raise
except sqlite3.Error as err:
self._logger.error(
"SQLite transfer failed inserting data into table %s: %s",
table_name,
err,
)
raise
[docs]
def transfer(self) -> None:
"""The primary and only method with which we transfer all the data."""
if len(self._mysql_tables) > 0 or len(self._exclude_mysql_tables) > 0:
# transfer only specific tables
specific_tables: t.Sequence[str] = (
self._exclude_mysql_tables if len(self._exclude_mysql_tables) > 0 else self._mysql_tables
)
self._mysql_cur_prepared.execute(
"""
SELECT TABLE_NAME, TABLE_TYPE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_NAME {exclude} IN ({placeholders})
""".format(
exclude="NOT" if len(self._exclude_mysql_tables) > 0 else "",
placeholders=("%s, " * len(specific_tables)).rstrip(" ,"),
),
specific_tables,
)
tables: t.Iterable[t.Tuple[str, str]] = (
(
str(row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]),
str(row[1].decode() if isinstance(row[1], (bytes, bytearray)) else row[1]),
)
for row in self._mysql_cur_prepared.fetchall()
)
else:
# transfer all tables
self._mysql_cur.execute(
"""
SELECT TABLE_NAME, TABLE_TYPE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
"""
)
def _coerce_row(row: t.Any) -> t.Tuple[str, str]:
try:
# Row like (name, type)
name = row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]
ttype = (
row[1].decode()
if (isinstance(row, (list, tuple)) and len(row) > 1 and isinstance(row[1], (bytes, bytearray)))
else (row[1] if (isinstance(row, (list, tuple)) and len(row) > 1) else "BASE TABLE")
)
return str(name), str(ttype)
except (TypeError, IndexError, UnicodeDecodeError):
# Fallback: treat as a single value name when row is not a 2-tuple or decoding fails
name = row.decode() if isinstance(row, (bytes, bytearray)) else str(row)
return name, "BASE TABLE"
tables = (_coerce_row(row) for row in self._mysql_cur.fetchall())
try:
# turn off foreign key checking in SQLite while transferring data
self._sqlite_cur.execute("PRAGMA foreign_keys=OFF")
for table_name, table_type in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()
if isinstance(table_type, bytes):
table_type = table_type.decode()
self._logger.info(
"%s%sTransferring table %s",
"[WITHOUT DATA] " if self._without_data else "",
"[ONLY DATA] " if self._without_tables else "",
table_name,
)
# reset the chunk
self._current_chunk_number = 0
if not self._without_tables:
# create the table or view
if table_type == "VIEW" and self._views_as_views:
self._create_view(table_name) # type: ignore[arg-type]
else:
self._create_table(table_name) # type: ignore[arg-type]
if not self._without_data and not (table_type == "VIEW" and self._views_as_views):
# get the size of the data
if self._limit_rows > 0:
# limit to the requested number of rows
safe_table = self._escape_mysql_backticks(table_name)
self._mysql_cur_dict.execute(
"SELECT COUNT(*) AS `total_records` "
f"FROM (SELECT * FROM `{safe_table}` LIMIT {self._limit_rows}) AS `table`"
)
else:
# get all rows
safe_table = self._escape_mysql_backticks(table_name)
self._mysql_cur_dict.execute(f"SELECT COUNT(*) AS `total_records` FROM `{safe_table}`")
total_records: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone()
if total_records is not None:
total_records_count: int = int(total_records["total_records"]) # type: ignore[arg-type]
else:
total_records_count = 0
# only continue if there is anything to transfer
if total_records_count > 0:
# populate it
safe_table = self._escape_mysql_backticks(table_name)
self._mysql_cur.execute(
"SELECT * FROM `{table_name}` {limit}".format(
table_name=safe_table,
limit=f"LIMIT {self._limit_rows}" if self._limit_rows > 0 else "",
)
)
columns: t.Tuple[str, ...] = tuple(column[0] for column in self._mysql_cur.description) # type: ignore[union-attr]
# build the SQL string
sql = """
INSERT OR IGNORE
INTO "{table}" ({fields})
VALUES ({placeholders})
""".format(
table=table_name,
fields=('"{}", ' * len(columns)).rstrip(" ,").format(*columns),
placeholders=("?, " * len(columns)).rstrip(" ,"),
)
self._transfer_table_data(
table_name=table_name, # type: ignore[arg-type]
sql=sql,
total_records=total_records_count,
)
except Exception: # pylint: disable=W0706
raise
finally:
# re-enable foreign key checking once done transferring
self._sqlite_cur.execute("PRAGMA foreign_keys=ON")
if self._vacuum:
self._logger.info("Vacuuming created SQLite database file.\nThis might take a while.")
self._sqlite_cur.execute("VACUUM")
self._logger.info("Done!")