您的位置:首頁 > 軟件教程 > 教程 > 異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

來源:好特整理 | 時間:2024-05-06 09:47:46 | 閱讀:101 |  標(biāo)簽: C   | 分享到:

開心一刻 今天坐沙發(fā)上看電視,旁邊的老婆拿著手機貼了過來 老婆:老公,這次出門旅游,機票我準(zhǔn)備買了哈 我:嗯 老婆:你、我、你爸媽、我爸媽,一共六張票 老婆:這上面還有意外保險,要不要買? 我:都特么團滅了,還買啥保險? 異構(gòu)數(shù)據(jù)源同步 概念介紹 數(shù)據(jù)源,不只是包含關(guān)系型數(shù)據(jù)庫,還包括 NoSQL、

開心一刻

今天坐沙發(fā)上看電視,旁邊的老婆拿著手機貼了過來
老婆:老公,這次出門旅游,機票我準(zhǔn)備買了哈
我:嗯
老婆:你、我、你爸媽、我爸媽,一共六張票
老婆:這上面還有意外保險,要不要買?
我:都特么團滅了,還買啥保險?

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

異構(gòu)數(shù)據(jù)源同步

概念介紹

  • 數(shù)據(jù)源,不只是包含關(guān)系型數(shù)據(jù)庫,還包括 NoSQL、數(shù)倉、中間件、ftp 等等,凡是有存儲功能的都算
  • 異構(gòu),兩端的數(shù)據(jù)源的結(jié)構(gòu)存在差異,比如列數(shù)不一致、列類型不一致等等
  • 同步,將源數(shù)據(jù)源的數(shù)據(jù)同步到目標(biāo)數(shù)據(jù)源,包括數(shù)據(jù)讀取、轉(zhuǎn)換和寫入過程

所以,異構(gòu)數(shù)據(jù)源同步就是指在不同類型或格式的數(shù)據(jù)源之間傳輸和同步數(shù)據(jù)的過程

同步策略

主要有兩種同步策略: 離線同步 實時同步 ,各有其特點和適用場景
但是,這些我今天都不講,就吊吊你們胃口

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

如果你們想了解,自己去查吧
今天我就要逆襲一把,將 離線同步 中的一個小配角轉(zhuǎn)正成主角!

表結(jié)構(gòu)同步

異構(gòu)數(shù)據(jù)源同步 整個主線劇情中, 數(shù)據(jù)同步 才是真正的主角

表結(jié)構(gòu)同步 只能算活不過三集的那種配角

但今天不拍主線劇情,我要拍個番外篇來重點講 表結(jié)構(gòu)同步 ,我是導(dǎo)演嘛,當(dāng)然我說了算

背景說明

主要是針對關(guān)系型數(shù)據(jù)庫,當(dāng)目標(biāo)數(shù)據(jù)源的表不存在時,則先在目標(biāo)數(shù)據(jù)源創(chuàng)建目標(biāo)表,然后進行數(shù)據(jù)的同步

比如:從 MySQL 的表 tbl_t1 同步到 SQL Server 的表 tbl_tt ,若 tbl_tt 不存在,則根據(jù) tbl_t1 的表結(jié)構(gòu)創(chuàng)建 tbl_tt

所以這里就涉及到表結(jié)構(gòu)的同步,也正是本文的主角!

如何實現(xiàn)

通過 jdbc 來實現(xiàn),具體實現(xiàn)步驟如下

  1. 通過 jdbc 獲取元數(shù)據(jù)信息:表元數(shù)據(jù)、列元數(shù)據(jù)、主鍵元數(shù)據(jù)、索引元數(shù)據(jù)

  2. 根據(jù)元數(shù)據(jù)拼接目標(biāo)表的建表 SQL

  3. 通過 jdbc ,根據(jù)建表 SQL,在目標(biāo)數(shù)據(jù)源創(chuàng)建目標(biāo)表

第 3 步實現(xiàn)比較容易,難得是第 1、2步
雖然前路坑很多,但你們不要慌,我已經(jīng)替你們趟掉很多了

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

我們以 MySQL 為例,假設(shè)我們庫 test 下有表 tbl_sync

CREATE TABLE `tbl_sync` (
  `c_bigint_auto` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'bigint 類型',
  `c_bigint` bigint DEFAULT NULL COMMENT 'bigint 類型',
  `c_vachar` varchar(100) NOT NULL COMMENT 'varchar 類型',
  `c_char` char(32) NOT NULL COMMENT 'char 類型',
  `c_text` text NOT NULL COMMENT 'text 類型',
  `c_decimal_4` decimal(15,4) NOT NULL DEFAULT '5000.0000' COMMENT 'decimal 類型',
  `c_decimal_0` decimal(10,0) DEFAULT NULL COMMENT 'decimal 類型',
  `c_blob` blob COMMENT 'blob 類型',
  `c_bit` bit(1) DEFAULT NULL COMMENT 'bit 類型',
  `c_tinyint` tinyint DEFAULT NULL COMMENT 'tinyint 類型',
  `c_binary` binary(10) DEFAULT NULL COMMENT 'binary 類型',
  `c_float` float(13,0) DEFAULT NULL COMMENT 'float 類型',
  `c_double` double(23,0) DEFAULT NULL COMMENT 'double 類型',
  `c_varbinary` varbinary(20) DEFAULT NULL COMMENT 'varbinary 類型',
  `c_longblob` longblob COMMENT 'longblob 類型',
  `c_longtext` longtext COMMENT 'longtext 類型',
  `c_json` json DEFAULT NULL COMMENT 'json 類型',
  `c_date` date DEFAULT NULL COMMENT 'date 類型',
  `c_time` time(2) DEFAULT NULL COMMENT 'time 類型',
  `c_datetime` datetime(3) DEFAULT NULL COMMENT 'datetime 類型',
  `c_timestamp` timestamp(4) NULL DEFAULT NULL COMMENT 'timestamp 類型',
  `c_year` year DEFAULT NULL COMMENT 'year 類型',
  PRIMARY KEY (`c_vachar`,`c_char`,`c_bigint_auto`),
  UNIQUE KEY `uk_id` (`c_bigint_auto`),
  KEY `idx_name_salary` (`c_vachar`,`c_decimal_4`)
) COMMENT='包含各種類型列的同步表';

現(xiàn)在需要將其同步到另一個 MySQL obj_db

表元數(shù)據(jù)

表的元信息比較少,包括表名、表類型、表說明(表注釋)等,其他的,類似字符集、排序規(guī)則等,就繼承數(shù)據(jù)庫的

表名,我想你們都知道,也就是對應(yīng)上面的 tbl_sync

表說明(表注釋)你們肯定也知道,對應(yīng)上面的 包含各種類型列的同步表

那表類型是什么,你們還知道嗎?

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

我們通常說的表是狹義上的表,也就是 基本表 ,是最常見的表類型,用于存儲具有明確定義的列和數(shù)據(jù)類型的數(shù)據(jù)

tbl_sync 就是 基本表 ,但廣義上的表還包括 視圖 、 臨時表 、 系統(tǒng)表 等等

下文都是基于 基本表 ,大家需要注意這個前提

通過 jdbc 獲取 表元數(shù)據(jù),非常簡單,直接看代碼

Connection connection = dataSource.getConnection();
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet tableResultSet = databaseMetaData.getTables(connection.getCatalog(), connection.getSchema(),
		"tbl_sync", new String[]{"TABLE"});
while (tableResultSet.next()) {
	System.out.println("tableCatalog = " + tableResultSet.getString("TABLE_CAT"));
	System.out.println("tableSchema = " + tableResultSet.getString("TABLE_SCHEM"));
	System.out.println("tableName = " + tableResultSet.getString("TABLE_NAME"));
	System.out.println("tableType = " + tableResultSet.getString("TABLE_TYPE"));
	System.out.println("remarks = " + tableResultSet.getString("REMARKS"));
}

輸出結(jié)果

tableCatalog = test
tableSchema = null
tableName = tbl_sync
tableType = TABLE
remarks = 包含各種類型列的同步表

一般我們只需要關(guān)注: TABLE_NAME 、 TABLE_TYPE 、 REMARKS

我們看下 java.sql.DatabaseMetaData#getTables 說明

點擊查看代碼
/**
 * Retrieves a description of the tables available in the given catalog.
 * Only table descriptions matching the catalog, schema, table
 * name and type criteria are returned.  They are ordered by
 * TABLE_TYPE, TABLE_CAT,
 * TABLE_SCHEM and TABLE_NAME.
 * 

* Each table description has the following columns: *

    *
  1. TABLE_CAT String {@code =>} table catalog (may be null) *
  2. TABLE_SCHEM String {@code =>} table schema (may be null) *
  3. TABLE_NAME String {@code =>} table name *
  4. TABLE_TYPE String {@code =>} table type. Typical types are "TABLE", * "VIEW", "SYSTEM TABLE", "GLOBAL TEMPORARY", * "LOCAL TEMPORARY", "ALIAS", "SYNONYM". *
  5. REMARKS String {@code =>} explanatory comment on the table *
  6. TYPE_CAT String {@code =>} the types catalog (may be null) *
  7. TYPE_SCHEM String {@code =>} the types schema (may be null) *
  8. TYPE_NAME String {@code =>} type name (may be null) *
  9. SELF_REFERENCING_COL_NAME String {@code =>} name of the designated * "identifier" column of a typed table (may be null) *
  10. REF_GENERATION String {@code =>} specifies how values in * SELF_REFERENCING_COL_NAME are created. Values are * "SYSTEM", "USER", "DERIVED". (may be null) *
* *

Note: Some databases may not return information for * all tables. * * @param catalog a catalog name; must match the catalog name as it * is stored in the database; "" retrieves those without a catalog; * null means that the catalog name should not be used to narrow * the search * @param schemaPattern a schema name pattern; must match the schema name * as it is stored in the database; "" retrieves those without a schema; * null means that the schema name should not be used to narrow * the search * @param tableNamePattern a table name pattern; must match the * table name as it is stored in the database * @param types a list of table types, which must be from the list of table types * returned from {@link #getTableTypes},to include; null returns * all types * @return ResultSet - each row is a table description * @exception SQLException if a database access error occurs * @see #getSearchStringEscape */

相信你們都能看懂,我只強調(diào)下 TABLE_TYPE

其值包括

  • TABLE
  • VIEW
  • SYSTEM TABLE
  • GLOBAL TEMPORARY,LOCAL TEMPORARY
  • ALIAS
  • SYNONYM

列元數(shù)據(jù)

列元信息比較多一點,包括列名、列類型、列類型名、是否自增、是否允許NULL、列大小、小數(shù)位數(shù)、默認值、列說明(列注釋)等

通過 jdbc 獲取 列元數(shù)據(jù) 也很簡單,直接看代碼

Connection connection = dataSource.getConnection();
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet columnResultSet = databaseMetaData.getColumns(connection.getCatalog(), connection.getSchema(),
		"tbl_sync", null);
while (columnResultSet.next()) {
	System.out.println("ColumnName = " + columnResultSet.getString("COLUMN_NAME"));
	System.out.println("ColumnType = " + columnResultSet.getInt("DATA_TYPE"));
	System.out.println("ColumnTypeName = " + columnResultSet.getString("TYPE_NAME"));
	System.out.println("isAutoIncrement = " + columnResultSet.getString("IS_AUTOINCREMENT"));
	System.out.println("isNullable = " + columnResultSet.getString("IS_NULLABLE"));
	System.out.println("Precision = " + columnResultSet.getInt("COLUMN_SIZE"));
	System.out.println("Scale = " + columnResultSet.getInt("DECIMAL_DIGITS"));
	System.out.println("DefaultValue = " + columnResultSet.getString("COLUMN_DEF"));
	System.out.println("Remarks = " + columnResultSet.getString("REMARKS"));
	System.out.println("===================================");
}

輸出結(jié)果

ColumnName = c_bigint_auto
ColumnType = -5
ColumnTypeName = BIGINT UNSIGNED
isAutoIncrement = YES
isNullable = NO
Precision = 20
Scale = 0
DefaultValue = null
Remarks = bigint 類型
===================================
ColumnName = c_bigint
ColumnType = -5
ColumnTypeName = BIGINT
isAutoIncrement = NO
isNullable = YES
Precision = 19
Scale = 0
DefaultValue = null
Remarks = bigint 類型
===================================
ColumnName = c_vachar
ColumnType = 12
ColumnTypeName = VARCHAR
isAutoIncrement = NO
isNullable = NO
Precision = 100
Scale = 0
DefaultValue = null
Remarks = varchar 類型
===================================
...

ColumnType 的值是 java.sql.Types -5 即是 java.sql.Types#BIGINT

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

那是不是根據(jù) ColumnType 就可以推斷出數(shù)據(jù)庫列類型了?

我們看下如下輸出

ColumnName = c_longtext
ColumnType = -1
ColumnTypeName = LONGTEXT
isAutoIncrement = NO
isNullable = YES
Precision = 2147483647
Scale = 0
DefaultValue = null
Remarks = longtext 類型
===================================
ColumnName = c_json
ColumnType = -1
ColumnTypeName = JSON
isAutoIncrement = NO
isNullable = YES
Precision = 1073741824
Scale = 0
DefaultValue = null
Remarks = json 類型
===================================

ColumnType = -1 是對應(yīng) LONGTEXT ,還是對應(yīng) JSON ?

我們再看一個

ColumnName = c_datetime
ColumnType = 93
ColumnTypeName = DATETIME
isAutoIncrement = NO
isNullable = YES
Precision = 23
Scale = 0
DefaultValue = null
Remarks = datetime 類型
===================================
ColumnName = c_timestamp
ColumnType = 93
ColumnTypeName = TIMESTAMP
isAutoIncrement = NO
isNullable = YES
Precision = 24
Scale = 0
DefaultValue = null
Remarks = timestamp 類型
===================================

ColumnType = 93 是對應(yīng) DATETIME ,還是對應(yīng) TIMESTAMP ?

這說明不能通過 java.sql.Types 精準(zhǔn)確認列的數(shù)據(jù)庫類型!!

那怎么辦?

我相信你們已經(jīng)看到了列的另一個元數(shù)據(jù): ColumnTypeName

它不就是 源數(shù)據(jù)源 中列列類型嗎?

比如列 c_timestamp 的類型不就是 TIMESTAMP 嗎,絲毫不差,準(zhǔn)確的很!

但是我們不能忘了我們的初衷:拼接目標(biāo)表的建表 SQL

通過 ColumnTypeName 能不能對應(yīng)到目標(biāo)表的列類型?

直接使用,肯定是不行的,關(guān)系型數(shù)據(jù)庫之間的類型不是完全一一對應(yīng)的,比如 MySQL DATETIME , Oracle 是沒有的

那可不可以通過 ColumnTypeName 來映射了,比如 DATETIME 映射到 Oracle DATE

理論上來說是可行的,但是,問題又來了!

我們是通過 jdbc 來完成映射的,它只提供了 int 類型的 java.sql.Types ,并未提供 String 類型的 java.sql.Types

莫非你要自實現(xiàn) String 類型的 java.sql.Types ? 你窮舉的過來嗎?

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

所以我們需要根據(jù) java.sql.Types 對源數(shù)據(jù)源的列類型最大兼容性獲取,而不是百分之百的精準(zhǔn)獲取

例如: java.sql.Types#LONGVARCHAR 就當(dāng)作列類型 LONGTEXT ,然后向目標(biāo)數(shù)據(jù)源映射

如果想更精準(zhǔn),則再結(jié)合 ColumnTypeName 的值向目標(biāo)數(shù)據(jù)源映射

總之一句話: ColumnType 主導(dǎo), ColumnTypeName 輔助,完成目標(biāo)數(shù)據(jù)源列映射

java.sql.DatabaseMetaData#getColumns 能獲取的元數(shù)據(jù)不局限于上述示例中的那些

大家可以去看下其源碼注釋,因為太長了,我就不貼了,我們重點看下 COLUMN_SIZE

* The COLUMN_SIZE column specifies the column size for the given column.
* For numeric data, this is the maximum precision.  For character data, this is the length in characters.
* For datetime datatypes, this is the length in characters of the String representation (assuming the
* maximum allowed precision of the fractional seconds component). For binary data, this is the length in bytes.  For the ROWID datatype,
* this is the length in bytes. Null is returned for data types where the
* column size is not applicable.

我給你們逐行翻譯下

/**
 *  For numeric data, this is the maximum precision => 對于數(shù)值數(shù)據(jù),表示最大精度
 *  For character data, this is the length in characters => 對于字符數(shù)據(jù),表示字符長度
 *  For datetime datatypes, this is the length in characters of the String representation(assuming the maximum allowed precision of the fractional seconds component )
 *      => 對于日期時間數(shù)據(jù)類型,表示字符串表示形式的最大長度(假設(shè)最大允許的分秒小數(shù)部分的精度)
 *      例如:"2024-04-30 14:00:00" => 19,"2024-04-30 14:00:00.234" => 23
 *      "14:00:00" => 8,"14:00:00.234" => 11
 *  For binary data, this is the length in bytes => 對于二進制數(shù)據(jù),表示字節(jié)長度
 *  For the ROWID datatype, this is the length in bytes => 對于 ROWID 類型,表示字節(jié)長度
 *  0 is returned for data types where the column size is not applicable => 對于列大小不適用的數(shù)據(jù)類型,返回0
 */

主鍵元數(shù)據(jù)

主鍵元信息就比較少了,我們一般只關(guān)注主鍵名、列名、列序號

通過 jdbc 代碼獲取,示例代碼如下

Connection connection = dataSource.getConnection();
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet primaryKeysResultSet = databaseMetaData.getPrimaryKeys(connection.getCatalog(), connection.getSchema(), "tbl_sync");
while (primaryKeysResultSet.next()) {
	String columnName = primaryKeysResultSet.getString("COLUMN_NAME");
	short keySeq = primaryKeysResultSet.getShort("KEY_SEQ");
	String pkName = primaryKeysResultSet.getString("PK_NAME");
	System.out.println(columnName + " - " + keySeq + " - " + pkName);
}

輸出結(jié)果

c_vachar - 1 - PRIMARY
c_char - 2 - PRIMARY
c_bigint_auto - 3 - PRIMARY

不用過多說明了吧,你們肯定都能看懂

索引元數(shù)據(jù)

與主鍵元數(shù)據(jù)類似,關(guān)注的元數(shù)據(jù)主要包括索引名、列名、列序號,同時多了一個 是否非唯一

通過 jdbc 獲取,代碼如下

Connection connection = dataSource.getConnection();
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet indexResultSet = databaseMetaData.getIndexInfo(connection.getCatalog(), connection.getSchema(), "tbl_sync", false, false);
while (indexResultSet.next()) {
	String indexName = indexResultSet.getString("INDEX_NAME");
	String columnName = indexResultSet.getString("COLUMN_NAME");
	boolean nonUnique = indexResultSet.getBoolean("NON_UNIQUE");
	short ordinalPosition = indexResultSet.getShort("ORDINAL_POSITION");
	System.out.println(columnName + " - " + ordinalPosition + " - " + indexName +  " - " + nonUnique);
}

輸出結(jié)果

c_vachar - 1 - PRIMARY - false
c_char - 2 - PRIMARY - false
c_bigint_auto - 3 - PRIMARY - false
c_bigint_auto - 1 - uk_id - false
c_vachar - 1 - idx_name_salary - true
c_decimal_4 - 2 - idx_name_salary - true

建表 SQL

當(dāng)相關(guān)元數(shù)據(jù)都獲取到之后,就萬事俱備,只欠東風(fēng)了

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

我們將 test 庫下的表 tbl_sync 同步到另一個 MySQL obj_db

SQL 拼接如下

點擊查看代碼
public String getCreateTableSql(String schemaName, TableMeta tableMeta, List columnMetas,
								IndexMeta primaryKeyMeta, Map indexMetaMap) {
	StringBuilder createSql = new StringBuilder("CREATE TABLE " + schemaName + "." + tableMeta.getTableName() + " ( ");
	for (ColumnMeta columnMeta : columnMetas) {
		createSql.append(columnMeta.getColumnName()).append(" ").append(getColumnType(columnMeta));
		if (columnMeta.getIfUnsigned()) {
			createSql.append(" UNSIGNED");
		}
		if (columnMeta.getIfNullable() == 0) {
			createSql.append(" NOT NULL");
		}
		if (StrUtil.isNotBlank(columnMeta.getDefaultValue())) {
			createSql.append(" DEFAULT '").append(columnMeta.getDefaultValue()).append("'");
		}
		if (columnMeta.getIfAutoIncrement()) {
			createSql.append(" AUTO_INCREMENT");
		}
		if (StrUtil.isNotBlank(columnMeta.getRemarks())) {
			createSql.append(" COMMENT '").append(columnMeta.getRemarks()).append("'");
		}
		createSql.append(",");
	}
	// 主鍵處理
	if (ObjectUtil.isNotNull(primaryKeyMeta)) {
		List indexColumns = primaryKeyMeta.getIndexColumns();
		indexColumns.sort(Comparator.comparingInt(IndexColumnMeta::getOrdinalPosition));
		createSql.append(" PRIMARY KEY (");
		for (int i=0; i0) {
				createSql.append(",");
			}
			createSql.append(indexColumns.get(i).getColumnName());
		}
		createSql.append("),");
	}
	if (CollectionUtil.isNotEmpty(indexMetaMap)) {
		for (IndexMeta indexMeta : indexMetaMap.values()) {
			if (indexMeta.getIndexType() == IndexTypeEnum.UNIQUE) {
				// 唯一索引
				createSql.append("UNIQUE ");
			}
			createSql.append("KEY ").append(indexMeta.getIndexName()).append(" (");
			List indexColumns = indexMeta.getIndexColumns();
			indexColumns.sort(Comparator.comparingInt(IndexColumnMeta::getOrdinalPosition));
			for (int i=0; i0) {
					createSql.append(",");
				}
				createSql.append(indexColumns.get(i).getColumnName());
			}
			createSql.append("),");
		}
	}
	// 刪除最后一個逗號
	createSql.deleteCharAt(createSql.length()-1);
	createSql.append(")");
	if (StrUtil.isNotBlank(tableMeta.getRemarks())) {
		createSql.append(" COMMENT '").append(tableMeta.getRemarks()).append("'");
	}
	return createSql.toString();
}

/**
 * 獲取表 列類型
 * @param columnMeta 列元數(shù)據(jù)
 * @return mysql 列類型
 */
private String getColumnType(ColumnMeta columnMeta) {
	switch (columnMeta.getColumnType()) {
		// 數(shù)值類型
		case Types.TINYINT:
			return "TINYINT";
		case Types.SMALLINT:
			return "SMALLINT";
		case Types.INTEGER:
			return "INT";
		case Types.BIGINT:
			return "BIGINT";
		case Types.FLOAT:
		case Types.REAL:
			return columnMeta.getPrecision() > 0 ? "FLOAT(" + columnMeta.getPrecision() + "," + columnMeta.getScale() + ")" : "FLOAT";
		case Types.DOUBLE:
			return columnMeta.getPrecision() > 0 ? "DOUBLE(" + columnMeta.getPrecision() + "," + columnMeta.getScale() + ")" : "DOUBLE";
		case Types.DECIMAL:
			return "DECIMAL(" + columnMeta.getPrecision() + "," + columnMeta.getScale() + ")";
		case Types.NUMERIC:
			return columnMeta.getScale() <= 0 ? "BIGINT" : "DECIMAL(" + columnMeta.getPrecision() + "," + columnMeta.getScale() + ")";
		// 字符與字符串類型
		case Types.CHAR:
		case Types.NCHAR:
			return columnMeta.getPrecision() > 0 ? "CHAR(" + columnMeta.getPrecision() + ")" : "CHAR";
		case Types.VARCHAR:
		case Types.NVARCHAR:
			return columnMeta.getPrecision() > 0 ? "VARCHAR(" + columnMeta.getPrecision() + ")" : "VARCHAR";
		case Types.LONGVARCHAR:
		case Types.LONGNVARCHAR:
			switch (columnMeta.getColumnTypeName()) {
				case "TINYTEXT":
					return "TINYTEXT";
				case "MEDIUMTEXT":
					return "MEDIUMTEXT";
				case "LONGTEXT":
					return "LONGTEXT";
				case "JSON":
					return "JSON";
				default:
					return "TEXT";
			}
		case Types.CLOB:
		case Types.NCLOB:
			return "LONGTEXT";
		// 日期和時間類型
		case Types.DATE:
			switch (columnMeta.getColumnTypeName()) {
				case "YEAR":
					return "YEAR";
				default:
					return "DATE";
			}
		case Types.TIME:
			return "TIME" + (columnMeta.getPrecision() > 8 ? "(" + (columnMeta.getPrecision() - 9) + ")" : "");
		case Types.TIMESTAMP:
			switch (columnMeta.getColumnTypeName()) {
				case "DATETIME":
					return "DATETIME" + (columnMeta.getPrecision() > 19 ? "(" + (columnMeta.getPrecision() - 20) + ")" : "");
				case "DATE":
					// oracle 的 DATE
					return "DATETIME";
				default:
					return "TIMESTAMP"+ (columnMeta.getPrecision() > 19 ? "(" + (columnMeta.getPrecision() - 20) + ")" : "");
			}
		// 二進制類型
		case Types.BIT:
		case Types.BOOLEAN:
			return columnMeta.getPrecision() > 0 ? "BIT(" + columnMeta.getPrecision() + ")" : "BIT";
		case Types.BINARY:
			return columnMeta.getPrecision() > 0 ? "BINARY(" + columnMeta.getPrecision() + ")" : "BINARY";
		case Types.VARBINARY:
			return columnMeta.getPrecision() > 0 ? "VARBINARY(" + columnMeta.getPrecision() + ")" : "VARBINARY";
		case Types.BLOB:
		case Types.LONGVARBINARY:
			switch (columnMeta.getColumnTypeName()) {
				case "TINYBLOB":
					return "TINYBLOB";
				case "MEDIUMBLOB":
					return "MEDIUMBLOB";
				case "LONGBLOB":
					return "LONGBLOB";
				default:
					return "BLOB";
			}
		case Types.OTHER:
			if (columnMeta.getColumnTypeName().contains("VARCHAR")) {
				return "VARCHAR" + (columnMeta.getPrecision() > 0 ? "(" + columnMeta.getPrecision() + ")" : "");
			} else if (columnMeta.getColumnTypeName().contains("TIMESTAMP")) {
				return "TIMESTAMP" + (columnMeta.getScale() > 0 ? "(" + columnMeta.getScale() + ")" : "");
			}
			else {
				throw new SyncException("不支持的類型:" + columnMeta.getColumnTypeName());
			}
		default:
			throw new SyncException("不支持的類型:" + columnMeta.getColumnTypeName());
	}
}

結(jié)合元數(shù)據(jù)的獲取

點擊查看代碼
@Test
public void getMySQLCreateTableSql() throws SQLException {
	Connection connection = dataSource.getConnection();
	DatabaseMetaData databaseMetaData = connection.getMetaData();
	ResultSet tableResultSet = databaseMetaData.getTables(connection.getCatalog(), connection.getSchema(), "tbl_sync", new String[]{"TABLE"});
	TableMeta tableMeta = new TableMeta();
	while (tableResultSet.next()) {
		tableMeta.setTableName(tableResultSet.getString("TABLE_NAME"));
		tableMeta.setTableType(tableResultSet.getString("TABLE_TYPE"));
		tableMeta.setRemarks(tableResultSet.getString("REMARKS"));
	}
	// 獲取列元數(shù)據(jù)
	ResultSet columnResultSet = databaseMetaData.getColumns(connection.getCatalog(), connection.getSchema(), "tbl_sync", null);
	List columnMetas = new ArrayList<>();
	while (columnResultSet.next()) {
		ColumnMeta columnMeta = new ColumnMeta();
		columnMeta.setColumnName(columnResultSet.getString("COLUMN_NAME"));
		columnMeta.setColumnType(columnResultSet.getInt("DATA_TYPE"));
		columnMeta.setColumnTypeName(columnResultSet.getString("TYPE_NAME"));
		columnMeta.setIfAutoIncrement("YES".equalsIgnoreCase(columnResultSet.getString("IS_AUTOINCREMENT")));
		columnMeta.setIfNullable("YES".equalsIgnoreCase(columnResultSet.getString("IS_NULLABLE")) ? 1 : 0);
		columnMeta.setPrecision(columnResultSet.getInt("COLUMN_SIZE"));
		columnMeta.setScale(columnResultSet.getInt("DECIMAL_DIGITS"));
		columnMeta.setDefaultValue(columnResultSet.getString("COLUMN_DEF"));
		columnMeta.setRemarks(columnResultSet.getString("REMARKS"));
		columnMeta.setIfUnsigned(columnMeta.getColumnTypeName().contains("UNSIGNED"));
		columnMetas.add(columnMeta);
	}
	columnResultSet.close();
	// 獲取主鍵元數(shù)據(jù)
	ResultSet primaryKeyResultSet = databaseMetaData.getPrimaryKeys(connection.getCatalog(), connection.getSchema(), "tbl_sync");
	IndexMeta primaryKeyMeta = new IndexMeta();
	while (primaryKeyResultSet.next()) {
		IndexColumnMeta indexColumnMeta = new IndexColumnMeta(primaryKeyResultSet.getString("COLUMN_NAME"), primaryKeyResultSet.getShort("KEY_SEQ"));
		primaryKeyMeta.setIndexName(primaryKeyResultSet.getString("PK_NAME"));
		primaryKeyMeta.getIndexColumns().add(indexColumnMeta);
	}
	primaryKeyResultSet.close();
	// 獲取索引元數(shù)據(jù)
	ResultSet indexResultSet = databaseMetaData.getIndexInfo(connection.getCatalog(), connection.getSchema(), "tbl_sync", false, false);
	Map indexMetaMap = new HashMap<>();
	while (indexResultSet.next()) {
		String indexName = indexResultSet.getString("INDEX_NAME");
		if (indexName.equals(primaryKeyMeta.getIndexName())) {
			continue;
		}
		IndexMeta indexMeta = indexMetaMap.get(indexName);
		if (ObjectUtil.isNull(indexMeta)) {
			indexMeta = new IndexMeta(indexName);
			indexMetaMap.put(indexName, indexMeta);
		}
		indexMeta.setIndexType(indexResultSet.getBoolean("NON_UNIQUE") ? IndexTypeEnum.NORMAL : IndexTypeEnum.UNIQUE);
		indexMeta.getIndexColumns().add(new IndexColumnMeta(indexResultSet.getString("COLUMN_NAME"), indexResultSet.getShort("ORDINAL_POSITION")));
	}
	indexResultSet.close();

	MysqlSql mysqlSql = new MysqlSql();
	String createTableSql = mysqlSql.getCreateTableSql("obj_db", tableMeta, columnMetas, primaryKeyMeta, indexMetaMap);
	System.out.println(SQLUtils.formatMySql(createTableSql));
}

得到的建表 SQL 如下

CREATE TABLE obj_db.tbl_sync (
	c_bigint_auto BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'bigint 類型',
	c_bigint BIGINT COMMENT 'bigint 類型',
	c_vachar VARCHAR(100) NOT NULL COMMENT 'varchar 類型',
	c_char CHAR(32) NOT NULL COMMENT 'char 類型',
	c_text TEXT NOT NULL COMMENT 'text 類型',
	c_decimal_4 DECIMAL(15, 4) NOT NULL DEFAULT '5000.0000' COMMENT 'decimal 類型',
	c_decimal_0 DECIMAL(10, 0) COMMENT 'decimal 類型',
	c_blob BLOB COMMENT 'blob 類型',
	c_bit BIT(1) COMMENT 'bit 類型',
	c_tinyint TINYINT COMMENT 'tinyint 類型',
	c_binary BINARY(10) COMMENT 'binary 類型',
	c_float FLOAT(13, 0) COMMENT 'float 類型',
	c_double DOUBLE(23, 0) COMMENT 'double 類型',
	c_varbinary VARBINARY(20) COMMENT 'varbinary 類型',
	c_longblob LONGBLOB COMMENT 'longblob 類型',
	c_longtext LONGTEXT COMMENT 'longtext 類型',
	c_json JSON COMMENT 'json 類型',
	c_date DATE COMMENT 'date 類型',
	c_time TIME(2) COMMENT 'time 類型',
	c_datetime DATETIME(3) COMMENT 'datetime 類型',
	c_timestamp TIMESTAMP(4) COMMENT 'timestamp 類型',
	c_year YEAR COMMENT 'year 類型',
	PRIMARY KEY (c_vachar, c_char, c_bigint_auto),
	UNIQUE KEY uk_id (c_bigint_auto),
	KEY idx_name_salary (c_vachar, c_decimal_4)
) COMMENT '包含各種類型列的同步表'

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

可以看出,與原表的結(jié)構(gòu)是一致的!

此處應(yīng)該有掌聲

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

同源同步

何謂同源?

就是數(shù)據(jù)庫類型相同的數(shù)據(jù)源,例如從 MySQL 同步到 MySQL

這種情況還有必要進行 SQL 拼接嗎?

異構(gòu)數(shù)據(jù)源同步之表結(jié)構(gòu)同步 → 通過 jdbc 實現(xiàn),沒那么簡單

還記得怎么查看 MySQL 表的完整定義嗎

SHOW CREATE TABLE test.tbl_sync

這是不是就可以獲取到表的 DDL

所以同源的表結(jié)構(gòu)同步,就不用拼接 SQL 那么復(fù)雜了,直接獲取 DDL 后在目標(biāo)數(shù)據(jù)源建表即可

總結(jié)

  • 異構(gòu)數(shù)據(jù)源同步的策略有兩種:離線同步 和 實時同步,各自的特點及使用場景需要區(qū)分清楚
  • 關(guān)系型數(shù)據(jù)庫的元數(shù)據(jù)有很多種,大家可以仔細看看 java.sql.DatabaseMetaData
  • 同源表結(jié)構(gòu)同步,可以不用拼接建表 SQL,可以直接獲取建表 DDL
  • 異源表結(jié)構(gòu)同步,需要先獲取源表的相關(guān)元數(shù)據(jù),然后再拼接目標(biāo)表的建表 SQL,最后在目標(biāo)數(shù)據(jù)源執(zhí)行 SQL 創(chuàng)建目標(biāo)表
  • COLUMN_SIZE 針對不同的列類型,它的含義不同,文中已經(jīng)詳細說明,值得大家注意
小編推薦閱讀

好特網(wǎng)發(fā)布此文僅為傳遞信息,不代表好特網(wǎng)認同期限觀點或證實其描述。

相關(guān)視頻攻略

更多

掃二維碼進入好特網(wǎng)手機版本!

掃二維碼進入好特網(wǎng)微信公眾號!

本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請發(fā)郵件[email protected]

湘ICP備2022002427號-10 湘公網(wǎng)安備:43070202000427號© 2013~2025 haote.com 好特網(wǎng)