-
-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy path_helper_methods.py
More file actions
321 lines (231 loc) · 13.2 KB
/
_helper_methods.py
File metadata and controls
321 lines (231 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def get_item_info(self, item_type
, item_id=None, item_name=None
, collection_id=None, collection_name=None
, params=None):
'''
Return the info for the given item.
Use 'params' for providing arguments. E.g. to include tables in the result for databases, use: params={'include':'tables'}
'''
assert item_type in ['database', 'table', 'card', 'collection', 'dashboard', 'pulse', 'segment']
if params:
assert type(params) == dict
if not item_id:
if not item_name:
raise ValueError('Either the name or id of the {} must be provided.'.format(item_type))
item_id = self.get_item_id(item_type, item_name, collection_id=collection_id, collection_name=collection_name)
res = self.get("/api/{}/{}".format(item_type, item_id), params=params)
if res:
return res
else:
raise ValueError('There is no {} with the id "{}"'.format(item_type, item_id))
def get_item_name(self, item_type, item_id):
assert item_type in ['database', 'table', 'card', 'collection', 'dashboard', 'pulse', 'segment']
res = self.get("/api/{}/{}".format(item_type, item_id))
if res:
return res['name']
else:
raise ValueError('There is no {} with the id "{}"'.format(item_type, item_id))
def get_item_id(self, item_type, item_name, collection_id=None, collection_name=None, db_id=None, db_name=None, table_id=None):
assert item_type in ['database', 'table', 'card', 'collection', 'dashboard', 'pulse', 'segment']
if item_type in ['card', 'dashboard', 'pulse']:
if not collection_id:
if not collection_name:
# Collection name/id is not provided. Searching in all collections
item_IDs = [ i['id'] for i in self.get("/api/{}/".format(item_type)) if i['name'] == item_name
and i['archived'] == False ]
else:
collection_id = self.get_item_id('collection', collection_name) if collection_name != 'root' else None
item_IDs = [ i['id'] for i in self.get("/api/{}/".format(item_type)) if i['name'] == item_name
and i['collection_id'] == collection_id
and i['archived'] == False ]
else:
collection_name = self.get_item_name('collection', collection_id)
item_IDs = [ i['id'] for i in self.get("/api/{}/".format(item_type)) if i['name'] == item_name
and i['collection_id'] == collection_id
and i['archived'] == False ]
if len(item_IDs) > 1:
if not collection_name:
raise ValueError('There is more than one {} with the name "{}".\n\
Provide collection id/name to limit the search space'.format(item_type, item_name))
raise ValueError('There is more than one {} with the name "{}" in the collection "{}"'
.format(item_type, item_name, collection_name))
if len(item_IDs) == 0:
if not collection_name:
raise ValueError('There is no {} with the name "{}"'.format(item_type, item_name))
raise ValueError('There is no item with the name "{}" in the collection "{}"'
.format(item_name, collection_name))
return item_IDs[0]
if item_type == 'collection':
collection_IDs = [ i['id'] for i in self.get("/api/collection/") if i['name'] == item_name ]
if len(collection_IDs) > 1:
raise ValueError('There is more than one collection with the name "{}"'.format(item_name))
if len(collection_IDs) == 0:
raise ValueError('There is no collection with the name "{}"'.format(item_name))
return collection_IDs[0]
if item_type == 'database':
res = self.get("/api/database/")
if type(res) == dict: # in Metabase version *.40.0 the format of the returned result for this endpoint changed
res = res['data']
db_IDs = [ i['id'] for i in res if i['name'] == item_name ]
if len(db_IDs) > 1:
raise ValueError('There is more than one DB with the name "{}"'.format(item_name))
if len(db_IDs) == 0:
raise ValueError('There is no DB with the name "{}"'.format(item_name))
return db_IDs[0]
if item_type == 'table':
tables = self.get("/api/table/")
if db_id:
table_IDs = [ i['id'] for i in tables if i['name'] == item_name and i['db']['id'] == db_id ]
elif db_name:
table_IDs = [ i['id'] for i in tables if i['name'] == item_name and i['db']['name'] == db_name ]
else:
table_IDs = [ i['id'] for i in tables if i['name'] == item_name ]
if len(table_IDs) > 1:
raise ValueError('There is more than one table with the name {}. Provide db id/name.'.format(item_name))
if len(table_IDs) == 0:
raise ValueError('There is no table with the name "{}" (in the provided db, if any)'.format(item_name))
return table_IDs[0]
if item_type == 'segment':
segment_IDs = [ i['id'] for i in self.get("/api/segment/") if i['name'] == item_name
and (not table_id or i['table_id'] == table_id) ]
if len(segment_IDs) > 1:
raise ValueError('There is more than one segment with the name "{}"'.format(item_name))
if len(segment_IDs) == 0:
raise ValueError('There is no segment with the name "{}"'.format(item_name))
return segment_IDs[0]
def get_collection_id(self, collection_name):
import warnings
warnings.warn("The function get_collection_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
collection_IDs = [ i['id'] for i in self.get("/api/collection/") if i['name'] == collection_name ]
if len(collection_IDs) > 1:
raise ValueError('There is more than one collection with the name "{}"'.format(collection_name))
if len(collection_IDs) == 0:
raise ValueError('There is no collection with the name "{}"'.format(collection_name))
return collection_IDs[0]
def get_db_id(self, db_name):
import warnings
warnings.warn("The function get_db_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
res = self.get("/api/database/")
if type(res) == dict: # in Metabase version *.40.0 the format of the returned result for this endpoint changed
res = res['data']
db_IDs = [ i['id'] for i in res if i['name'] == db_name ]
if len(db_IDs) > 1:
raise ValueError('There is more than one DB with the name "{}"'.format(db_name))
if len(db_IDs) == 0:
raise ValueError('There is no DB with the name "{}"'.format(db_name))
return db_IDs[0]
def get_table_id(self, table_name, db_name=None, db_id=None):
import warnings
warnings.warn("The function get_table_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
tables = self.get("/api/table/")
if db_id:
table_IDs = [ i['id'] for i in tables if i['name'] == table_name and i['db']['id'] == db_id ]
elif db_name:
table_IDs = [ i['id'] for i in tables if i['name'] == table_name and i['db']['name'] == db_name ]
else:
table_IDs = [ i['id'] for i in tables if i['name'] == table_name ]
if len(table_IDs) > 1:
raise ValueError('There is more than one table with the name {}. Provide db id/name.'.format(table_name))
if len(table_IDs) == 0:
raise ValueError('There is no table with the name "{}" (in the provided db, if any)'.format(table_name))
return table_IDs[0]
def get_segment_id(self, segment_name, table_id=None):
import warnings
warnings.warn("The function get_segment_id will be removed in the next version. Use get_item_id function instead.", DeprecationWarning)
segment_IDs = [ i['id'] for i in self.get("/api/segment/") if i['name'] == segment_name
and (not table_id or i['table_id'] == table_id) ]
if len(segment_IDs) > 1:
raise ValueError('There is more than one segment with the name "{}"'.format(segment_name))
if len(segment_IDs) == 0:
raise ValueError('There is no segment with the name "{}"'.format(segment_name))
return segment_IDs[0]
def get_db_id_from_table_id(self, table_id):
tables = [ i['db_id'] for i in self.get("/api/table/") if i['id'] == table_id ]
if len(tables) == 0:
raise ValueError('There is no DB containing the table with the ID "{}"'.format(table_id))
return tables[0]
def get_db_info(self, db_name=None, db_id=None, params=None):
'''
Return Database info. Use 'params' for providing arguments.
For example to include tables in the result, use: params={'include':'tables'}
'''
import warnings
warnings.warn("The function get_db_info will be removed in the next version. Use get_item_info function instead.", DeprecationWarning)
if params:
assert type(params) == dict
if not db_id:
if not db_name:
raise ValueError('Either the name or id of the DB needs to be provided.')
db_id = self.get_item_id('database', db_name)
return self.get("/api/database/{}".format(db_id), params=params)
def get_table_metadata(self, table_name=None, table_id=None, db_name=None, db_id=None, params=None):
if params:
assert type(params) == dict
if not table_id:
if not table_name:
raise ValueError('Either the name or id of the table needs to be provided.')
table_id = self.get_item_id('table', table_name, db_name=db_name, db_id=db_id)
return self.get("/api/table/{}/query_metadata".format(table_id), params=params)
def get_database_name_id(self, db_id=None, db_name=None, column_id_name=False):
if not db_id:
if not db_name:
raise ValueError(
"Either the name or id of the target database needs to be provided."
)
else:
db_id = self.get_item_id("database", db_name)
db_info = self.get_item_info("database", db_id, params={"include": "tables"})
if column_id_name:
db_table_name_id = {
table["id"]: {"name": table["name"], "columns": {}}
for table in db_info["tables"]
}
else:
db_table_name_id = {
table["name"]: {"id": table["id"], "columns": {}}
for table in db_info["tables"]
}
return db_table_name_id
def get_columns_name_id(self, table_name=None, db_name=None, table_id=None, db_id=None, verbose=False, column_id_name=False):
'''
Return a dictionary with col_name key and col_id value, for the given table_id/table_name in the given db_id/db_name.
If column_id_name is True, return a dictionary with col_id key and col_name value.
'''
if not self.friendly_names_is_disabled():
raise ValueError('Please disable "Friendly Table and Field Names" from Admin Panel > Settings > General, and try again.')
if not table_name:
if not table_id:
raise ValueError('Either the name or id of the table must be provided.')
table_name = self.get_item_name(item_type='table', item_id=table_id)
# Get db_id
if not db_id:
if db_name:
db_id = self.get_item_id('database', db_name)
else:
if not table_id:
table_id = self.get_item_id('table', table_name)
db_id = self.get_db_id_from_table_id(table_id)
# Get column names and IDs
if column_id_name:
return {i['id']: i['name'] for i in self.get("/api/database/{}/fields".format(db_id))
if i['table_name'] == table_name}
else:
return {i['name']: i['id'] for i in self.get("/api/database/{}/fields".format(db_id))
if i['table_name'] == table_name}
def friendly_names_is_disabled(self):
'''
The endpoint /api/database/:db-id/fields which is used in the function get_columns_name_id relies on the display name of fields.
If "Friendly Table and Field Names" (in Admin Panel > Settings > General) is not disabled, it changes the display name of fields.
So it is important to make sure this setting is disabled, before running the get_columns_name_id function.
'''
# checking whether friendly_name is disabled required admin access.
# So to let non-admin users also use this package we skip this step for them.
# There is warning in the __init__ method for these users.
if not self.is_admin:
return True
friendly_name_setting = [ i['value'] for i in self.get('/api/setting') if i['key'] == 'humanization-strategy' ][0]
return friendly_name_setting == 'none' # 'none' means disabled
@staticmethod
def verbose_print(verbose, msg):
if verbose:
print(msg)