Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 82b14d8

Browse files
committed
consolidate do_groups, do_mitigations into a singular method do_mapping
add support to for `software` operation, fix issue not allowing mobile-attack operations
1 parent f967bb3 commit 82b14d8

1 file changed

Lines changed: 54 additions & 66 deletions

File tree

scripts/filter_techniques_by_category.py

Lines changed: 54 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,46 @@ def build_taxii_source(collection_name):
2121
return stix2.MemorySource(stix_data=taxii_ds.query())
2222

2323

24-
def get_all_techniques(src):
24+
def get_all_techniques(src, source_name):
2525
"""Filters data source by attack-pattern which extracts all ATT&CK Techniques"""
2626
filters = [
27-
stix2.Filter('type', '=', 'attack-pattern'),
28-
stix2.Filter('external_references.source_name', '=', 'mitre-attack'),
27+
stix2.Filter("type", "=", "attack-pattern"),
28+
stix2.Filter("external_references.source_name", "=", source_name),
2929
]
3030
results = src.query(filters)
3131
return remove_deprecated(results)
3232

3333

34-
def filter_for_term_relationships(src, relationship_type, object_id, source=True):
34+
def filter_for_term_relationships(src, relationship_type, object_id, target=True):
3535
"""Filters data source by relationship that matches type and source or target"""
3636
filters = [
3737
stix2.Filter("type", "=", "relationship"),
3838
stix2.Filter("relationship_type", "=", relationship_type),
3939
]
40-
if source:
41-
filters.append(stix2.Filter("source_ref", "=", object_id))
42-
else:
40+
if target:
4341
filters.append(stix2.Filter("target_ref", "=", object_id))
42+
else:
43+
filters.append(stix2.Filter("source_ref", "=", object_id))
4444

4545
results = src.query(filters)
4646
return remove_deprecated(results)
4747

4848

49-
def filter_by_type_and_id(src, object_type, object_id):
49+
def filter_by_type_and_id(src, object_type, object_id, source_name):
5050
"""Filters data source by id and type"""
5151
filters = [
5252
stix2.Filter("type", "=", object_type),
5353
stix2.Filter("id", "=", object_id),
54+
stix2.Filter("external_references.source_name", "=", source_name),
5455
]
5556
results = src.query(filters)
5657
return remove_deprecated(results)
5758

5859

59-
def grab_external_id(stix_object):
60+
def grab_external_id(stix_object, source_name):
6061
"""Grab external id from STIX2 object"""
6162
for external_reference in stix_object.get("external_references", []):
62-
if external_reference.get("source_name") == "mitre-attack":
63+
if external_reference.get("source_name") == source_name:
6364
return external_reference["external_id"]
6465

6566

@@ -86,85 +87,72 @@ def arg_parse():
8687
"""Function to handle script arguments."""
8788
parser = argparse.ArgumentParser(description="Fetches the current ATT&CK content expressed as STIX2 and creates spreadsheet matching Techniques with Mitigations or Groups.")
8889
parser.add_argument("-c", "--collection", type=str, required=True, choices=["enterprise_attack", "mobile_attack"], help="Which collection to use (Enterprise, Mobile).")
89-
parser.add_argument("-o", "--operation", type=str, required=True, choices=["groups", "mitigations"], help="Operation to perform on ATT&CK content.")
90+
parser.add_argument("-o", "--operation", type=str, required=True, choices=["groups", "mitigations", "software"], help="Operation to perform on ATT&CK content.")
9091
parser.add_argument("-s", "--save", type=str, required=False, help="Save the CSV file with a different filename.")
9192
return parser
9293

9394

94-
def do_groups(ds):
95-
"""Main logic to match techniques to groups"""
96-
all_attack_patterns = get_all_techniques(ds)
95+
def do_mapping(ds, fieldnames, relationship_type, type_filter, source_name, sorting_keys):
96+
"""Main logic to map techniques to mitigations, groups or software"""
97+
all_attack_patterns = get_all_techniques(ds, source_name)
9798
writable_results = []
9899

99100
for attack_pattern in all_attack_patterns:
100-
tid = grab_external_id(attack_pattern)
101-
102-
# Grabs uses relationships for identified techniques
103-
relationships = filter_for_term_relationships(ds, "uses", attack_pattern.id, source=False)
101+
# Grabs relationships for identified techniques
102+
relationships = filter_for_term_relationships(ds, relationship_type, attack_pattern.id)
104103

105104
for relationship in relationships:
106105
# Groups are defined in STIX as intrusion-set objects
107-
groups = filter_by_type_and_id(ds, "intrusion-set", relationship.source_ref)
108-
109-
if groups:
110-
group = groups[0]
111-
gid = grab_external_id(group)
112-
writable_results.append(
113-
{
114-
"TID": tid,
115-
"Technique Name": attack_pattern.name,
116-
"GID": gid,
117-
"Group Name": group.name,
118-
"Group Description": group.description,
119-
"Usage": relationship.description
120-
}
106+
# Mitigations are defined in STIX as course-of-action objects
107+
# Software are defined in STIX as malware objects
108+
stix_results = filter_by_type_and_id(ds, type_filter, relationship.source_ref, source_name)
109+
110+
if stix_results:
111+
row_data = (
112+
grab_external_id(attack_pattern, source_name),
113+
attack_pattern.name,
114+
grab_external_id(stix_results[0], source_name),
115+
stix_results[0].name,
116+
escape_chars(stix_results[0].description),
117+
escape_chars(relationship.description),
121118
)
122-
return sorted(writable_results, key=lambda x: (x["TID"], x["GID"]))
123-
124-
125-
def do_mitigations(ds):
126-
"""Main logic to match techniques to mitigations"""
127-
all_attack_patterns = get_all_techniques(ds)
128-
writable_results = []
129-
130-
for attack_pattern in all_attack_patterns:
131-
tid = grab_external_id(attack_pattern)
132119

133-
# Grabs mitigation relationships for identified techniques
134-
relationships = filter_for_term_relationships(ds, "mitigates", attack_pattern.id, source=False)
120+
writable_results.append(dict(zip(fieldnames, row_data)))
135121

136-
for relationship in relationships:
137-
# Mitigations are defined in STIX as course-of-action objects
138-
mitigation = filter_by_type_and_id(ds, "course-of-action", relationship.source_ref)
139-
140-
if mitigation:
141-
mitigation = mitigation[0]
142-
mid = grab_external_id(mitigation)
143-
writable_results.append(
144-
{
145-
"TID": tid,
146-
"Technique Name": attack_pattern.name,
147-
"MID": mid,
148-
"Mitigation Name": mitigation.name,
149-
"Mitigation Description": escape_chars(mitigation.description),
150-
"Application": escape_chars(relationship.description),
151-
}
152-
)
153-
return sorted(writable_results, key=lambda x: (x["TID"], x["MID"]))
122+
return sorted(writable_results, key=lambda x: (x[sorting_keys[0]], x[sorting_keys[1]]))
154123

155124

156125
def main(args):
157126
data_source = build_taxii_source(args.collection)
158127
op = args.operation
159128

129+
source_map = {
130+
"enterprise_attack": "mitre-attack",
131+
"mobile_attack": "mitre-mobile-attack",
132+
}
133+
source_name = source_map[args.collection]
134+
160135
if op == "groups":
161136
filename = args.save or "groups.csv"
162-
fieldnames = ["TID", "Technique Name", "GID", "Group Name", "Group Description", "Usage"]
163-
rowdicts = do_groups(data_source)
137+
fieldnames = ("TID", "Technique Name", "GID", "Group Name", "Group Description", "Usage")
138+
relationship_type = "uses"
139+
type_filter = "intrusion-set"
140+
sorting_keys = ("TID", "GID")
141+
rowdicts = do_mapping(data_source, fieldnames, relationship_type, type_filter, source_name, sorting_keys)
164142
elif op == "mitigations":
165143
filename = args.save or "mitigations.csv"
166-
fieldnames = ["TID", "Technique Name", "MID", "Mitigation Name", "Mitigation Description", "Application"]
167-
rowdicts = do_mitigations(data_source)
144+
fieldnames = ("TID", "Technique Name", "MID", "Mitigation Name", "Mitigation Description", "Application")
145+
relationship_type = "mitigates"
146+
type_filter = "course-of-action"
147+
sorting_keys = ("TID", "MID")
148+
rowdicts = do_mapping(data_source, fieldnames, relationship_type, type_filter, source_name, sorting_keys)
149+
elif op == "software":
150+
filename = args.save or "software.csv"
151+
fieldnames = ("TID", "Technique Name", "SID", "Software Name", "Software Description", "Use")
152+
relationship_type = "uses"
153+
type_filter = "malware"
154+
sorting_keys = ("TID", "SID")
155+
rowdicts = do_mapping(data_source, fieldnames, relationship_type, type_filter, source_name, sorting_keys)
168156
else:
169157
raise RuntimeError("Unknown option: %s" % op)
170158

0 commit comments

Comments
 (0)