|
| 1 | +import argparse |
| 2 | +import csv |
| 3 | +import io |
| 4 | + |
| 5 | +import stix2 |
| 6 | +import taxii2client |
| 7 | +import tqdm |
| 8 | + |
| 9 | + |
| 10 | +def build_taxii_source(collection_name): |
| 11 | + """Downloads latest Enterprise or Mobile ATT&CK content from MITRE TAXII Server.""" |
| 12 | + # Establish TAXII2 Collection instance for Enterprise ATT&CK collection |
| 13 | + collection_map = { |
| 14 | + "enterprise_attack": "95ecc380-afe9-11e4-9b6c-751b66dd541e", |
| 15 | + "mobile_attack": "2f669986-b40b-4423-b720-4396ca6a462b" |
| 16 | + } |
| 17 | + collection_url = "https://cti-taxii.mitre.org/stix/collections/" + collection_map[collection_name] + "/" |
| 18 | + collection = taxii2client.Collection(collection_url) |
| 19 | + taxii_ds = stix2.TAXIICollectionSource(collection) |
| 20 | + |
| 21 | + # Create an in-memory source (to prevent multiple web requests) |
| 22 | + return stix2.MemorySource(stix_data=taxii_ds.query()) |
| 23 | + |
| 24 | + |
| 25 | +def get_all_techniques(src, source_name): |
| 26 | + """Filters data source by attack-pattern which extracts all ATT&CK Techniques""" |
| 27 | + filters = [ |
| 28 | + stix2.Filter("type", "=", "attack-pattern"), |
| 29 | + stix2.Filter("external_references.source_name", "=", source_name), |
| 30 | + ] |
| 31 | + results = src.query(filters) |
| 32 | + return remove_deprecated(results) |
| 33 | + |
| 34 | + |
| 35 | +def filter_for_term_relationships(src, relationship_type, object_id, target=True): |
| 36 | + """Filters data source by type, relationship_type and source or target""" |
| 37 | + filters = [ |
| 38 | + stix2.Filter("type", "=", "relationship"), |
| 39 | + stix2.Filter("relationship_type", "=", relationship_type), |
| 40 | + ] |
| 41 | + if target: |
| 42 | + filters.append(stix2.Filter("target_ref", "=", object_id)) |
| 43 | + else: |
| 44 | + filters.append(stix2.Filter("source_ref", "=", object_id)) |
| 45 | + |
| 46 | + results = src.query(filters) |
| 47 | + return remove_deprecated(results) |
| 48 | + |
| 49 | + |
| 50 | +def filter_by_type_and_id(src, object_type, object_id, source_name): |
| 51 | + """Filters data source by id and type""" |
| 52 | + filters = [ |
| 53 | + stix2.Filter("type", "=", object_type), |
| 54 | + stix2.Filter("id", "=", object_id), |
| 55 | + stix2.Filter("external_references.source_name", "=", source_name), |
| 56 | + ] |
| 57 | + results = src.query(filters) |
| 58 | + return remove_deprecated(results) |
| 59 | + |
| 60 | + |
| 61 | +def grab_external_id(stix_object, source_name): |
| 62 | + """Grab external id from STIX2 object""" |
| 63 | + for external_reference in stix_object.get("external_references", []): |
| 64 | + if external_reference.get("source_name") == source_name: |
| 65 | + return external_reference["external_id"] |
| 66 | + |
| 67 | + |
| 68 | +def remove_deprecated(stix_objects): |
| 69 | + """Will remove any revoked or deprecated objects from queries made to the data source""" |
| 70 | + # Note we use .get() because the property may not be present in the JSON data. The default is False |
| 71 | + # if the property is not set. |
| 72 | + return list( |
| 73 | + filter( |
| 74 | + lambda x: x.get("x_mitre_deprecated", False) is False and x.get("revoked", False) is False, |
| 75 | + stix_objects |
| 76 | + ) |
| 77 | + ) |
| 78 | + |
| 79 | + |
| 80 | +def escape_chars(a_string): |
| 81 | + """Some characters create problems when written to file""" |
| 82 | + return a_string.translate(str.maketrans({ |
| 83 | + "\n": r"\\n", |
| 84 | + })) |
| 85 | + |
| 86 | + |
| 87 | +def arg_parse(): |
| 88 | + """Function to handle script arguments.""" |
| 89 | + parser = argparse.ArgumentParser(description="Fetches the current ATT&CK content expressed as STIX2 and creates spreadsheet mapping Techniques with Mitigations, Groups or Software.") |
| 90 | + parser.add_argument("-d", "--domain", type=str, required=True, choices=["enterprise_attack", "mobile_attack"], help="Which ATT&CK domain to use (Enterprise, Mobile).") |
| 91 | + parser.add_argument("-m", "--mapping-type", type=str, required=True, choices=["groups", "mitigations", "software"], help="Which type of object to output mappings for using ATT&CK content.") |
| 92 | + parser.add_argument("-s", "--save", type=str, required=False, help="Save the CSV file with a different filename.") |
| 93 | + return parser |
| 94 | + |
| 95 | + |
| 96 | +def do_mapping(ds, fieldnames, relationship_type, type_filter, source_name, sorting_keys): |
| 97 | + """Main logic to map techniques to mitigations, groups or software""" |
| 98 | + all_attack_patterns = get_all_techniques(ds, source_name) |
| 99 | + writable_results = [] |
| 100 | + |
| 101 | + for attack_pattern in tqdm.tqdm(all_attack_patterns, desc="parsing data for techniques"): |
| 102 | + # Grabs relationships for identified techniques |
| 103 | + relationships = filter_for_term_relationships(ds, relationship_type, attack_pattern.id) |
| 104 | + |
| 105 | + for relationship in relationships: |
| 106 | + # Groups are defined in STIX as intrusion-set objects |
| 107 | + # Mitigations are defined in STIX as course-of-action objects |
| 108 | + # Software are defined in STIX as malware objects |
| 109 | + stix_results = filter_by_type_and_id(ds, type_filter, relationship.source_ref, source_name) |
| 110 | + |
| 111 | + if stix_results: |
| 112 | + row_data = ( |
| 113 | + grab_external_id(attack_pattern, source_name), |
| 114 | + attack_pattern.name, |
| 115 | + grab_external_id(stix_results[0], source_name), |
| 116 | + stix_results[0].name, |
| 117 | + escape_chars(stix_results[0].description), |
| 118 | + escape_chars(relationship.description), |
| 119 | + ) |
| 120 | + |
| 121 | + writable_results.append(dict(zip(fieldnames, row_data))) |
| 122 | + |
| 123 | + return sorted(writable_results, key=lambda x: (x[sorting_keys[0]], x[sorting_keys[1]])) |
| 124 | + |
| 125 | + |
| 126 | +def main(args): |
| 127 | + data_source = build_taxii_source(args.domain) |
| 128 | + op = args.mapping_type |
| 129 | + |
| 130 | + source_map = { |
| 131 | + "enterprise_attack": "mitre-attack", |
| 132 | + "mobile_attack": "mitre-mobile-attack", |
| 133 | + } |
| 134 | + source_name = source_map[args.domain] |
| 135 | + |
| 136 | + if op == "groups": |
| 137 | + filename = args.save or "groups.csv" |
| 138 | + fieldnames = ("TID", "Technique Name", "GID", "Group Name", "Group Description", "Usage") |
| 139 | + relationship_type = "uses" |
| 140 | + type_filter = "intrusion-set" |
| 141 | + sorting_keys = ("TID", "GID") |
| 142 | + rowdicts = do_mapping(data_source, fieldnames, relationship_type, type_filter, source_name, sorting_keys) |
| 143 | + elif op == "mitigations": |
| 144 | + filename = args.save or "mitigations.csv" |
| 145 | + fieldnames = ("TID", "Technique Name", "MID", "Mitigation Name", "Mitigation Description", "Application") |
| 146 | + relationship_type = "mitigates" |
| 147 | + type_filter = "course-of-action" |
| 148 | + sorting_keys = ("TID", "MID") |
| 149 | + rowdicts = do_mapping(data_source, fieldnames, relationship_type, type_filter, source_name, sorting_keys) |
| 150 | + elif op == "software": |
| 151 | + filename = args.save or "software.csv" |
| 152 | + fieldnames = ("TID", "Technique Name", "SID", "Software Name", "Software Description", "Use") |
| 153 | + relationship_type = "uses" |
| 154 | + type_filter = "malware" |
| 155 | + sorting_keys = ("TID", "SID") |
| 156 | + rowdicts = do_mapping(data_source, fieldnames, relationship_type, type_filter, source_name, sorting_keys) |
| 157 | + else: |
| 158 | + raise RuntimeError("Unknown option: %s" % op) |
| 159 | + |
| 160 | + with io.open(filename, "w", newline="", encoding="utf-8") as csvfile: |
| 161 | + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
| 162 | + writer.writeheader() |
| 163 | + writer.writerows(rowdicts) |
| 164 | + |
| 165 | + |
| 166 | +if __name__ == "__main__": |
| 167 | + parser = arg_parse() |
| 168 | + args = parser.parse_args() |
| 169 | + main(args) |
0 commit comments