navicate的数据迁移

 新到的这家公司分库分表做的很多,一个库就有3000多张表.还有好几个库.调试bug定位数据的时候简直让人心肺骤停.总是怀疑刚刚滑过的表名是不是错过了.

image-20250607125856009

分组配置定位

日常使用过程中就慢慢新建了很多分组来归类.

image-20250607130331435

最近换电脑,要迁移navicate数据到新电脑上.

navicate只有数据库连接导出,辛辛苦苦积累的分组同步不过去.

image-20250607130630059

官方没有这个导出功能.找了很久也没找到这个配置文件在哪.

最终还是给我找到了

先导出连接,用notepad打开,可以看到一个个连接配置中有一个SettingsSavePath的路径

image-20250607132156774

打开这个路径,在范围附近找找.

如果是mac打开终端输入下面命令,直接跳转对应路径

open "/Users/didi/Library/Containers/com.navicat.NavicatPremium/Data/Library/Application Support/PremiumSoft CyberTech/Navicat CC/Common/Settings/0/0/MySQL/pop_Job01"

我的位置在

/Users/didi/Library/Containers/com.navicat.NavicatPremium/Data/Library/Application Support/PremiumSoft CyberTech/Navicat CC/Navicat Premium/Profiles/vgroup.json

这里会发现有个vgroup.json 文件.打开就可以发现全部是配置的分组信息

在新电脑里导入数据库链接后,先新建个分组以便提前生成vgroup.json文件.然后把旧电脑里面的vgroup.json文件直接拷进新电脑对应目录,再重启navicate就发现分组信息都有了.

image-20250607133807247


生成vgroup.json 文件

既然发现是json结构配置的分组信息,那就可以很方便自定义了,对于有规律的表不需要亲手一个一个归类表结构了.

下面python代码功能是:

给出多个数据库链接,group_name相同的归为同一组,自动遍历所有schema

找出形如 xxxxxxx_2025_04 xxxxxxx_2025的这种表,自动归组,并且把未来2年的相关表也加入自动分组中

把生成的vgroup.json文件拷贝进目录重启navicat就可以了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import pymysql
import json
import re
from typing import List, Dict
from collections import defaultdict
from datetime import datetime, timedelta


class NavicatGroupGenerator:
def __init__(self, generate_future_tables: bool = False):
self.config = {
"version": "1.1",
"vgroups": [],
"connections": []
}
self.group_connections = defaultdict(list)
self.generate_future_tables = generate_future_tables # 是否生成未来表的开关

def get_future_tables(self, table_name: str, pattern_type: str) -> List[str]:
"""生成未来两年的表名"""
future_tables = []
current_date = datetime.now()

if pattern_type == 'Y_M':
# 从表名中提取年月信息
match = re.search(r'(\d{4})_(\d{2})$', table_name)
if match:
max_year = int(match.group(1))
max_month = int(match.group(2))
base_name = table_name[:table_name.rindex('_2')]

# 找到当前最大的年月
current_max_date = datetime(max_year, max_month, 1)

# 生成未来两年的所有月份
for i in range(1, 25): # 24个月
future_date = current_max_date + timedelta(days=32 * i)
future_date = future_date.replace(day=1) # 确保是月初
future_table = f"{base_name}_{future_date.year}_{future_date.strftime('%m')}"
future_tables.append(future_table)

elif pattern_type == 'Y':
# 从表名中提取年份信息
match = re.search(r'(\d{4})$', table_name)
if match:
max_year = int(match.group(1))
base_name = table_name[:table_name.rindex('_2')]

# 生成未来两年的表名
for year in range(max_year + 1, max_year + 3):
future_table = f"{base_name}_{year}"
future_tables.append(future_table)

return future_tables

def find_max_date_table(self, tables: List[str], pattern_type: str) -> str:
"""找到最大日期的表"""
max_table = None
max_date = None

for table in tables:
if pattern_type == 'Y_M':
match = re.search(r'(\d{4})_(\d{2})$', table)
if match:
year = int(match.group(1))
month = int(match.group(2))
current_date = datetime(year, month, 1)
if max_date is None or current_date > max_date:
max_date = current_date
max_table = table
elif pattern_type == 'Y':
match = re.search(r'(\d{4})$', table)
if match:
year = int(match.group(1))
if max_date is None or year > max_date:
max_date = year
max_table = table

return max_table

def analyze_table_pattern(self, tables: List[str]) -> Dict[str, List[str]]:
patterns = {}
pattern_ym = re.compile(r'^(.+)_\d{4}_\d{2}$')
pattern_y = re.compile(r'^(.+)_\d{4}$')

for table in tables:
match_ym = pattern_ym.match(table)
match_y = pattern_y.match(table)

if match_ym:
base_name = f"{match_ym.group(1)}_Y_M"
if base_name not in patterns:
patterns[base_name] = []
patterns[base_name].append(table)
elif match_y:
base_name = f"{match_y.group(1)}_Y"
if base_name not in patterns:
patterns[base_name] = []
patterns[base_name].append(table)

# 如果开启了生成未来表的功能,为每个模式生成未来表
if self.generate_future_tables:
new_patterns = {}
for base_name, table_list in patterns.items():
pattern_type = 'Y_M' if base_name.endswith('_Y_M') else 'Y'
max_table = self.find_max_date_table(table_list, pattern_type)
if max_table:
future_tables = self.get_future_tables(max_table, pattern_type)
new_patterns[base_name] = sorted(list(set(table_list + future_tables)))
else:
new_patterns[base_name] = table_list
patterns = new_patterns

return patterns

def connect_database(self, host: str, port: int, user: str, password: str) -> pymysql.Connection:
return pymysql.connect(
host=host,
port=port,
user=user,
password=password
)

def get_all_databases(self, connection: pymysql.Connection) -> List[str]:
with connection.cursor() as cursor:
cursor.execute("SHOW DATABASES")
return [db[0] for db in cursor.fetchall()]

def get_all_tables(self, connection: pymysql.Connection, database: str) -> List[str]:
with connection.cursor() as cursor:
connection.select_db(database)
cursor.execute("SHOW TABLES")
return [table[0] for table in cursor.fetchall()]


def generate_config(self, connections_info: List[Dict]) -> None:
# 首先按组名整理连接
for conn_info in connections_info:
self.group_connections[conn_info["group_name"]].append({
"name": conn_info["conn_name"],
"type": "CONNECTION",
"server_type": "MYSQL"
})

# 生成顶层虚拟分组
for group_name, connections in self.group_connections.items():
self.config["vgroups"].append({
"vgroup_name": group_name,
"vgroup_type": "CONNECTION",
"items": connections
})

# 处理每个连接的数据库结构
for conn_info in connections_info:
try:
connection = self.connect_database(
conn_info['host'],
conn_info['port'],
conn_info['user'],
conn_info['password']
)

connection_config = {
"conn_name": conn_info["conn_name"],
"conn_type": "MYSQL",
"vgroups": [],
"catalogs": [{
"catalog_name": "default",
"vgroups": [],
"schemas": []
}]
}

# 获取所有数据库
databases = self.get_all_databases(connection)

for db in databases:
# 获取数据库中所有的表
tables = self.get_all_tables(connection, db)

# 分析并分组符合日期格式的表
table_groups = self.analyze_table_pattern(tables)

if table_groups: # 只有当有符合条件的表时才添加schema
schema_config = {
"schema_name": db,
"vgroups": []
}

# 为每个表组创建一个虚拟分组
for group_name, grouped_tables in table_groups.items():
schema_config["vgroups"].append({
"vgroup_name": group_name,
"vgroup_type": "TABLE",
"items": [{
"name": table,
"type": "TABLE"
} for table in grouped_tables]
})

connection_config["catalogs"][0]["schemas"].append(schema_config)

if connection_config["catalogs"][0]["schemas"]: # 只有当有schema时才添加连接配置
self.config["connections"].append(connection_config)

connection.close()

except Exception as e:
print(f"Error processing connection {conn_info['conn_name']}: {str(e)}")

def save_config(self, filename: str) -> None:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4, ensure_ascii=False)


# 使用示例
if __name__ == "__main__":
# 数据库连接信息
connections_info = [
{
"group_name": "数据库链接组名",
"conn_name": "pop_Job01",
"host": "xxx.xxx.xxx.xxx",
"port": 3316,
"user": "username",
"password": "password"
},
{
"group_name": "数据库链接组名",
"conn_name": "pop_Job02",
"host": "xxx.xxx.xxx.xxx",
"port": 3326,
"user": "username",
"password": "password"
},
{
"group_name": "数据库链接组名",
"conn_name": "Self-job01",
"host": "xxx.xxx.xxx.xxx",
"port": 3416,
"user": "username",
"password": "password"
},
{
"group_name": "数据库链接组名2",
"conn_name": "xxxxxjdbc.driver",
"host": "xxx.xxx.xxx.xxx",
"port": 3307,
"user": "username",
"password": "password2"
}
]

#是否生成未来2年的表
generator = NavicatGroupGenerator(generate_future_tables=True)
generator.generate_config(connections_info)
generator.save_config('vgroup.json')
print("ok")


这是最终效果

image-20250607172206740


navicate的数据迁移
https://lililib.github.io/navicate的组配置文件转移/
作者
煨酒小童
发布于
2025年6月7日
许可协议