Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit fa35edb

Browse files
authored
Merge pull request #20 from mitre-attack/feature/diff-stix-subtechniques
Feature/diff stix subtechniques
2 parents 570bef7 + 2c91962 commit fa35edb

3 files changed

Lines changed: 130 additions & 22 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.DS_Store

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# Changes staged on develop
2+
## Improvements
3+
- Updated [diff_stix.py](scripts/diff_stix.py) with sub-techniques support. See issue [#12](https://github.com/mitre-attack/attack-scripts/issues/12).
4+
15
# V1.3 - 8 January 2019
26
## New Scripts
37
- Added [diff_stix.py](scripts/diff_stix.py).

scripts/diff_stix.py

Lines changed: 125 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,16 @@ def __init__(
122122
# }
123123
# software...
124124
}
125+
self.stixIDToName = {} # stixID to object name
126+
self.new_subtechnique_of_rels = [] # all subtechnique-of relationships in the new data
127+
self.old_subtechnique_of_rels = [] # all subtechnique-of relationships in the old data
128+
self.new_id_to_technique = {} # stixID => technique for every technique in the new data
129+
self.old_id_to_technique = {} # stixID => technique for every technique in the old data
130+
# build the bove data structures
125131
self.load_data()
132+
# remove duplicate relationships
133+
self.new_subtechnique_of_rels = [i for n, i in enumerate(self.new_subtechnique_of_rels) if i not in self.new_subtechnique_of_rels[n+1:]]
134+
self.old_subtechnique_of_rels = [i for n, i in enumerate(self.old_subtechnique_of_rels) if i not in self.old_subtechnique_of_rels[n+1:]]
126135

127136

128137
def verboseprint(self, *args, **kwargs):
@@ -186,26 +195,43 @@ def load_datastore(data_store):
186195
"data_store": data_store
187196
}
188197

198+
def parse_subtechniques(data_store, new=False):
199+
# parse dataStore sub-technique-of relationships
200+
if new:
201+
for technique in list(data_store.query(attackTypeToStixFilter["technique"])):
202+
self.new_id_to_technique[technique["id"]] = technique
203+
self.new_subtechnique_of_rels += list(data_store.query([
204+
Filter("type", "=", "relationship"),
205+
Filter("relationship_type", "=", "subtechnique-of")
206+
]))
207+
else:
208+
for technique in list(data_store.query(attackTypeToStixFilter["technique"])):
209+
self.old_id_to_technique[technique["id"]] = technique
210+
self.old_subtechnique_of_rels += list(data_store.query([
211+
Filter("type", "=", "relationship"),
212+
Filter("relationship_type", "=", "subtechnique-of")
213+
]))
214+
189215
# load data from directory according to domain
190-
def load_dir(dir):
216+
def load_dir(dir, new=False):
191217
data_store = MemoryStore()
192218
datafile = os.path.join(dir, domain + ".json")
193219
data_store.load_from_file(datafile)
194-
220+
parse_subtechniques(data_store, new)
195221
return load_datastore(data_store)
196222

197223
# load data from TAXII server according to domain
198-
def load_taxii():
224+
def load_taxii(new=False):
199225
collection = Collection("https://cti-taxii.mitre.org/stix/collections/" + domainToTaxiiCollectionId[domain])
200226
data_store = TAXIICollectionSource(collection)
201-
227+
parse_subtechniques(data_store, new)
202228
return load_datastore(data_store)
203229

204230
if self.use_taxii:
205-
old = load_taxii()
231+
old = load_taxii(False)
206232
else:
207-
old = load_dir(self.old)
208-
new = load_dir(self.new)
233+
old = load_dir(self.old, False)
234+
new = load_dir(self.new, True)
209235

210236
intersection = old["keys"] & new["keys"]
211237
additions = new["keys"] - old["keys"]
@@ -226,7 +252,12 @@ def load_taxii():
226252
Filter('type', '=', 'relationship'),
227253
Filter('relationship_type', '=', 'revoked-by'),
228254
Filter('source_ref', '=', key)
229-
])[0]["target_ref"]
255+
])
256+
if (len(revoked_by_key) == 0):
257+
print("WARNING: revoked object", key, "has no revoked-by relationship")
258+
continue
259+
else: revoked_by_key = revoked_by_key[0]["target_ref"]
260+
230261
new["id_to_obj"][key]["revoked_by"] = new["id_to_obj"][revoked_by_key]
231262

232263
revocations.add(key)
@@ -240,11 +271,11 @@ def load_taxii():
240271
try:
241272
old_version = float(old["id_to_obj"][key]["x_mitre_version"])
242273
except:
243-
print("old\n\t" +key)
274+
print("ERROR: cannot get old version for object: " + key)
244275
try:
245276
new_version = float(new["id_to_obj"][key]["x_mitre_version"])
246277
except:
247-
print("new\n\t" + key)
278+
print("ERROR: cannot get new version for object: " + key)
248279

249280
# check for changes
250281
if new_version > old_version:
@@ -307,12 +338,92 @@ def get_md_key(self):
307338
key += "\n" + "* Object deletions: " + statusDescriptions['deletions']
308339
return f"{key}"
309340

341+
def has_subtechniques(self, sdo, new=False):
342+
"""return true or false depending on whether the SDO has sub-techniques. new determines whether to parse from the new or old data"""
343+
if new: return len(list(filter(lambda rel: rel["target_ref"] == sdo["id"], self.new_subtechnique_of_rels))) > 0
344+
else: return len(list(filter(lambda rel: rel["target_ref"] == sdo["id"], self.old_subtechnique_of_rels))) > 0
310345

311346
def get_markdown_string(self):
312347
"""
313348
Return a markdown string summarizing detected differences.
314349
"""
315350

351+
def getSectionList(items, obj_type, section):
352+
"""
353+
parse a list of items in a section and return a string for the items
354+
"""
355+
356+
# get parents which have children
357+
childless = list(filter(lambda item: not self.has_subtechniques(item, True) and not ("x_mitre_is_subtechnique" in item and item["x_mitre_is_subtechnique"]), items))
358+
parents = list(filter(lambda item: self.has_subtechniques(item, True) and not ("x_mitre_is_subtechnique" in item and item["x_mitre_is_subtechnique"]), items))
359+
children = { item["id"]: item for item in filter(lambda item: "x_mitre_is_subtechnique" in item and item["x_mitre_is_subtechnique"], items) }
360+
361+
subtechnique_of_rels = self.new_subtechnique_of_rels if section != "deletions" else self.old_subtechnique_of_rels
362+
id_to_technique = self.new_id_to_technique if section != "deletions" else self.old_id_to_technique
363+
364+
parentToChildren = {} # stixID => [ children ]
365+
for relationship in subtechnique_of_rels:
366+
if relationship["target_ref"] in parentToChildren:
367+
if relationship["source_ref"] in children:
368+
parentToChildren[relationship["target_ref"]].append(children[relationship["source_ref"]])
369+
else:
370+
if relationship["source_ref"] in children:
371+
parentToChildren[relationship["target_ref"]] = children[relationship["source_ref"]]
372+
parentToChildren[relationship["target_ref"]] = [ children[relationship["source_ref"]] ]
373+
374+
375+
# now group parents and children
376+
groupings = []
377+
378+
for parent in childless + parents:
379+
parent_children = parentToChildren.pop(parent["id"]) if parent["id"] in parentToChildren else []
380+
groupings.append({
381+
"parent": parent,
382+
"parentInSection": True,
383+
"children": parent_children
384+
})
385+
386+
for parentID in parentToChildren:
387+
groupings.append({
388+
"parent": id_to_technique[parentID],
389+
"parentInSection": False,
390+
"children": parentToChildren[parentID]
391+
})
392+
393+
groupings = sorted(groupings, key=lambda grouping: grouping["parent"]["name"])
394+
395+
def placard(item):
396+
"""get a section list item for the given SDO according to section type"""
397+
if section == "revocations":
398+
revoker = item['revoked_by']
399+
if "x_mitre_is_subtechnique" in revoker and revoker["x_mitre_is_subtechnique"]:
400+
# get revoking technique's parent for display
401+
parentID = list(filter(lambda rel: rel["source_ref"] == revoker["id"], subtechnique_of_rels))[0]["target_ref"]
402+
parentName = id_to_technique[parentID]["name"] if parentID in id_to_technique else "ERROR NO PARENT"
403+
return f"{item['name']} (revoked by { parentName}: [{revoker['name']}]({self.site_prefix}/{self.getUrlFromStix(revoker)}))"
404+
else:
405+
return f"{item['name']} (revoked by [{revoker['name']}]({self.site_prefix}/{self.getUrlFromStix(revoker)}))"
406+
if section == "deletions":
407+
return f"{item['name']}"
408+
else:
409+
return f"[{item['name']}]({self.site_prefix}/{self.getUrlFromStix(item)})"
410+
411+
412+
# build sectionList string
413+
sectionString = ""
414+
for grouping in groupings:
415+
if grouping["parentInSection"]:
416+
sectionString += f"* { placard(grouping['parent']) }\n"
417+
# else:
418+
# sectionString += f"* _{grouping['parent']['name']}_\n"
419+
for child in sorted(grouping["children"], key=lambda child: child["name"]):
420+
if grouping["parentInSection"]:
421+
sectionString += f"\t* {placard(child) }\n"
422+
else:
423+
sectionString += f"* { grouping['parent']['name'] }: { placard(child) }\n"
424+
425+
return sectionString
426+
316427
self.verboseprint("generating markdown string... ", end="", flush="true")
317428

318429
content = ""
@@ -321,17 +432,9 @@ def get_markdown_string(self):
321432
for domain in self.data[obj_type]:
322433
domain_sections = ""
323434
for section in self.data[obj_type][domain]:
324-
if section == "revocations":
325-
# handle revoked by
326-
section_items = list(map(lambda d: f"* {d['name']} (revoked by [{d['revoked_by']['name']}]({self.site_prefix}/{self.getUrlFromStix(d['revoked_by'])}))", self.data[obj_type][domain][section]))
327-
elif section == "deletions":
328-
section_items = list(map(lambda d: f"* {d['name']}", self.data[obj_type][domain][section]))
329-
else:
330-
section_items = list(map(lambda d: f"* [{d['name']}]({self.site_prefix}/{self.getUrlFromStix(d)})", self.data[obj_type][domain][section]))
331-
332-
if len(section_items) > 0:
333-
section_items = "\n".join(sorted(section_items))
334-
else:
435+
if len(self.data[obj_type][domain][section]) > 0: # if there are items in the section
436+
section_items = getSectionList(self.data[obj_type][domain][section], obj_type, section)
437+
else: # no items in section
335438
section_items = "No changes"
336439
header = sectionNameToSectionHeaders[section] + ":"
337440
if "{obj_type}" in header:
@@ -385,7 +488,7 @@ def get_layers_dict(self):
385488

386489
# build layer structure
387490
layer_json = {
388-
"version": "2.2",
491+
"version": "3.0",
389492
"name": f"{thedate} {domainToDomainLabel[domain]} Updates",
390493
"description": f"{domainToDomainLabel[domain]} updates for the {thedate} release of ATT&CK",
391494
"domain": domainToLayerFileDomain[domain],

0 commit comments

Comments
 (0)