Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit b337569

Browse files
committed
Add new script that creates CSVs for technique mitigations or groups
1 parent 2a10b04 commit b337569

1 file changed

Lines changed: 180 additions & 0 deletions

File tree

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import argparse
2+
import csv
3+
import io
4+
5+
import stix2
6+
import taxii2client
7+
8+
9+
def build_taxii_source(collection_name):
10+
"""Downloads latest Enterprise ATT&CK content from GitHub."""
11+
# Establish TAXII2 Collection instance for Enterprise ATT&CK collection
12+
collection_map = {
13+
"enterprise_attack": "95ecc380-afe9-11e4-9b6c-751b66dd541e",
14+
"mobile_attack": "2f669986-b40b-4423-b720-4396ca6a462b"
15+
}
16+
collection_url = "https://cti-taxii.mitre.org/stix/collections/" + collection_map[collection_name] + "/"
17+
collection = taxii2client.Collection(collection_url)
18+
taxii_ds = stix2.TAXIICollectionSource(collection)
19+
20+
# Create an in-memory source (to prevent multiple web requests)
21+
return stix2.MemorySource(stix_data=taxii_ds.query())
22+
23+
24+
def get_all_techniques(src):
25+
"""Filters data source by attack-pattern which extracts all ATT&CK Techniques"""
26+
filters = [
27+
stix2.Filter('type', '=', 'attack-pattern'),
28+
stix2.Filter('external_references.source_name', '=', 'mitre-attack'),
29+
]
30+
results = src.query(filters)
31+
return remove_deprecated(results)
32+
33+
34+
def filter_for_term_relationships(src, relationship_type, object_id, source=True):
35+
"""Filters data source by relationship that matches type and source or target"""
36+
filters = [
37+
stix2.Filter("type", "=", "relationship"),
38+
stix2.Filter("relationship_type", "=", relationship_type),
39+
]
40+
if source:
41+
filters.append(stix2.Filter("source_ref", "=", object_id))
42+
else:
43+
filters.append(stix2.Filter("target_ref", "=", object_id))
44+
45+
results = src.query(filters)
46+
return remove_deprecated(results)
47+
48+
49+
def filter_by_type_and_id(src, object_type, object_id):
50+
"""Filters data source by list of ids"""
51+
filters = [
52+
stix2.Filter("type", "=", object_type),
53+
stix2.Filter("id", "=", object_id),
54+
]
55+
results = src.query(filters)
56+
return remove_deprecated(results)
57+
58+
59+
def grab_external_id(stix_object):
60+
"""Grab external id from STIX2 object"""
61+
for external_reference in stix_object.get("external_references", []):
62+
if external_reference.get("source_name") == "mitre-attack":
63+
return external_reference["external_id"]
64+
65+
66+
def remove_deprecated(stix_objects):
67+
"""Will remove any revoked or deprecated objects from queries made to the data source"""
68+
# Note we use .get() because the property may not be present in the JSON data. The default is False
69+
# if the property is not set.
70+
return list(
71+
filter(
72+
lambda x: x.get("x_mitre_deprecated", False) is False and x.get("revoked", False) is False,
73+
stix_objects
74+
)
75+
)
76+
77+
78+
def escape_chars(a_string):
79+
"""Some characters create problems when written to file"""
80+
return a_string.translate(str.maketrans({
81+
"\n": r"\\n",
82+
}))
83+
84+
85+
def arg_parse():
86+
"""Function to handle script arguments."""
87+
parser = argparse.ArgumentParser(description="Fetches the current ATT&CK content expressed as STIX2 and creates spreadsheet matching Techniques with Mitigations or Groups.")
88+
parser.add_argument("-c", "--collection", type=str, required=True, choices=["enterprise_attack", "mobile_attack"], help="Which collection to use (Enterprise, Mobile).")
89+
parser.add_argument("-o", "--operation", type=str, required=True, choices=["groups", "mitigations"], help="Operation to perform on ATT&CK content.")
90+
parser.add_argument("-s", "--save", type=str, required=False, help="Save the CSV file with a different filename.")
91+
return parser
92+
93+
94+
def do_groups(ds):
95+
"""Main logic to match techniques to groups"""
96+
all_attack_patterns = get_all_techniques(ds)
97+
writable_results = []
98+
99+
for attack_pattern in all_attack_patterns:
100+
tid = grab_external_id(attack_pattern)
101+
102+
# Grabs uses relationships for identified techniques
103+
relationships = filter_for_term_relationships(ds, "uses", attack_pattern.id, source=False)
104+
105+
for relationship in relationships:
106+
# Groups are defined in STIX as intrusion-set objects
107+
groups = filter_by_type_and_id(ds, "intrusion-set", relationship.source_ref)
108+
109+
if groups:
110+
group = groups[0]
111+
gid = grab_external_id(group)
112+
writable_results.append(
113+
{
114+
"TID": tid,
115+
"Technique Name": attack_pattern.name,
116+
"GID": gid,
117+
"Group Name": group.name,
118+
"Group Description": group.description,
119+
"Usage": relationship.description
120+
}
121+
)
122+
return sorted(writable_results, key=lambda x: (x["TID"], x["GID"]))
123+
124+
125+
def do_mitigations(ds):
126+
"""Main logic to match techniques to mitigations"""
127+
all_attack_patterns = get_all_techniques(ds)
128+
writable_results = []
129+
130+
for attack_pattern in all_attack_patterns:
131+
tid = grab_external_id(attack_pattern)
132+
133+
# Grabs mitigation relationships for identified techniques
134+
relationships = filter_for_term_relationships(ds, "mitigates", attack_pattern.id, source=False)
135+
136+
for relationship in relationships:
137+
# Mitigations are defined in STIX as course-of-action objects
138+
mitigation = filter_by_type_and_id(ds, "course-of-action", relationship.source_ref)
139+
140+
if mitigation:
141+
mitigation = mitigation[0]
142+
mid = grab_external_id(mitigation)
143+
writable_results.append(
144+
{
145+
"TID": tid,
146+
"Technique Name": attack_pattern.name,
147+
"MID": mid,
148+
"Mitigation Name": mitigation.name,
149+
"Mitigation Description": escape_chars(mitigation.description),
150+
"Application": escape_chars(relationship.description),
151+
}
152+
)
153+
return sorted(writable_results, key=lambda x: (x["TID"], x["MID"]))
154+
155+
156+
def main(args):
157+
data_source = build_taxii_source(args.collection)
158+
op = args.operation
159+
160+
if op == "groups":
161+
filename = args.save or "groups.csv"
162+
fieldnames = ["TID", "Technique Name", "GID", "Group Name", "Group Description", "Usage"]
163+
rowdicts = do_groups(data_source)
164+
elif op == "mitigations":
165+
filename = args.save or "mitigations.csv"
166+
fieldnames = ["TID", "Technique Name", "MID", "Mitigation Name", "Mitigation Description", "Application"]
167+
rowdicts = do_mitigations(data_source)
168+
else:
169+
raise RuntimeError("Unknown option: %s" % op)
170+
171+
with io.open(filename, "w", newline="", encoding="utf-8") as csvfile:
172+
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
173+
writer.writeheader()
174+
writer.writerows(rowdicts)
175+
176+
177+
if __name__ == "__main__":
178+
parser = arg_parse()
179+
args = parser.parse_args()
180+
main(args)

0 commit comments

Comments
 (0)