Skip to content

Commit e159cd5

Browse files
authored
Merge pull request #62 from zmap/phillip/bug-53-support-csv-input
Support CSV input format
2 parents cb30419 + 2275798 commit e159cd5

7 files changed

Lines changed: 393 additions & 26 deletions

File tree

Makefile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,15 @@ uninstall:
2323
else \
2424
echo "Uninstallation cancelled."; \
2525
fi
26-
test:
26+
test: zannotate
2727
go test -v ./...
28+
@if [ ! -d ".venv" ]; then \
29+
python3 -m venv .venv; \
30+
fi
31+
@. .venv/bin/activate && \
32+
pip install -r requirements.txt -q && \
33+
pytest test_zannotate.py -v
34+
2835

2936
lint:
3037
goimports -w -local "github.com/zmap/zannotate" ./

README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,31 @@ echo "127.0.0.1" | zannotate --rdns --geoasn --geoasn-database=/path-to-geo-asn.
164164
```
165165

166166
## Input
167+
168+
### New-line Separated IPs
169+
By default, ZAnnotate expects new-line delimited IP addresses on standard input. For example:
170+
```shell
171+
printf "1.1.1.1\n8.8.8.8" | zannotate --rdns
172+
```
173+
174+
```jsonl
175+
{"ip":"1.1.1.1","rdns":{"domain_names":["one.one.one.one"]}}
176+
{"ip":"8.8.8.8","rdns":{"domain_names":["dns.google"]}}
177+
```
178+
179+
### JSON and CSV Flags
180+
181+
The `--output-annotation-field` flag can be used to specify a different field name for the annotations instead of `zannotate` for both CSV and JSON. For example:
182+
183+
```shell
184+
printf "name,ip_address,date\n cloudflare,1.1.1.1,04-04-26\n google,8.8.8.8,04-04-26" | ./zannotate --rdns --input-file-type=csv --input-ip-field=ip_address --output-annotation-field="info"
185+
```
186+
187+
```json lines
188+
{"name":" cloudflare","ip_address":"1.1.1.1","date":"04-04-26","info":{"rdns":{"domain_names":["one.one.one.one"]}}}
189+
{"ip_address":"8.8.8.8","date":"04-04-26","info":{"rdns":{"domain_names":["dns.google"]}},"name":" google"}
190+
```
191+
### JSON
167192
You may wish to annotate data that is already in JSON format. You'll then need to use the `--input-file-type=json` flag.
168193
This will insert a `zannotate` field into the existing JSON object. For example:
169194

@@ -175,6 +200,41 @@ echo '{"ip": "1.1.1.1"}' | ./zannotate --rdns --geoasn --geoasn-database=/path-
175200
{"ip":"1.1.1.1","zannotate":{"geoasn":{"asn":13335,"org":"CLOUDFLARENET"},"rdns":{"domain_names":["one.one.one.one"]}}}
176201
```
177202

203+
If your JSON objects have a different field for the IP address, you can specify that with the `--input-ip-field` flag. For example, if your JSON objects have an `ip_address` field instead of `ip`, you can use:
204+
205+
```shell
206+
echo '{"ip_address": "1.1.1.1"}' | ./zannotate --rdns --input-file-type=json --input-ip-field=ip_address
207+
```
208+
209+
```json
210+
{"ip_address":"1.1.1.1","zannotate":{"rdns":{"domain_names":["one.one.one.one"]}}}
211+
````
212+
213+
### CSV
214+
If your input data is in CSV format, you can use the `--input-file-type=csv` flag.
215+
216+
```shell
217+
printf "name,ip,date\n cloudflare,1.1.1.1,04-04-26\n google,8.8.8.8,04-04-26" | ./zannotate --rdns --input-file-type=csv
218+
```
219+
220+
```jsonl
221+
{"name":" cloudflare","ip":"1.1.1.1","date":"04-04-26","zannotate":{"rdns":{"domain_names":["one.one.one.one"]}}}
222+
{"name":" google","ip":"8.8.8.8","date":"04-04-26","zannotate":{"rdns":{"domain_names":["dns.google"]}}}
223+
```
224+
225+
Similar to JSON, you can use the `--input-ip-field` flag to specify a column other than `ip` that contains the IP address.
226+
227+
```shell
228+
printf "name,ip_address,date\n cloudflare,1.1.1.1,04-04-26\n google,8.8.8.8,04-04-26" | ./zannotate --rdns --input-file-type=csv --input-ip-field=ip_address
229+
```
230+
231+
```jsonl
232+
{"date":"04-04-26","zannotate":{"rdns":{"domain_names":["dns.google"]}},"name":" google","ip_address":"8.8.8.8"}
233+
{"date":"04-04-26","zannotate":{"rdns":{"domain_names":["one.one.one.one"]}},"name":" cloudflare","ip_address":"1.1.1.1"}
234+
```
235+
236+
237+
178238
# Modules
179239

180240
## RDAP (WHOIS successor)

annotate.go

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package zannotate
1616

1717
import (
1818
"bufio"
19+
"encoding/csv"
20+
"slices"
1921

2022
"io"
2123
"net"
@@ -68,6 +70,41 @@ func ipToInProcess(line string) inProcessIP {
6870
return retv
6971
}
7072

73+
func csvToInProcess(record []string, headers []string, ipFieldName, annotationFieldName string) inProcessIP {
74+
var retv inProcessIP
75+
76+
if len(record) != len(headers) {
77+
log.Fatalf("csv record length (%d) does not match header length (%d) for record: %v", len(record), len(headers), record)
78+
}
79+
80+
outMap := make(map[string]interface{}, len(headers))
81+
for i, header := range headers {
82+
outMap[header] = record[i]
83+
}
84+
85+
foundIPField := false
86+
for i, header := range headers {
87+
switch header {
88+
case ipFieldName:
89+
foundIPField = true
90+
retv.Ip = net.ParseIP(record[i])
91+
if retv.Ip == nil {
92+
log.Fatalf("unable to parse IP at column '%d': %s", i, record[i])
93+
}
94+
case annotationFieldName:
95+
log.Fatalf("csv headers already contains annotation key '%v'", annotationFieldName)
96+
default:
97+
outMap[header] = record[i]
98+
}
99+
}
100+
if !foundIPField {
101+
log.Fatalf("unable to find IP address field with IP field name %s in CSV record: %v", ipFieldName, record)
102+
}
103+
104+
retv.Out = outMap
105+
return retv
106+
}
107+
71108
// single worker that reads from file and queues raw lines
72109
func AnnotateRead(conf *GlobalConf, path string, in chan<- string) {
73110
log.Debug("read thread started")
@@ -84,7 +121,20 @@ func AnnotateRead(conf *GlobalConf, path string, in chan<- string) {
84121
log.Debug("reading input from ", path)
85122
}
86123
r := bufio.NewReader(f)
87-
// read IPs out of JSON input
124+
if conf.InputFileType == "csv" {
125+
// Need to extract the header
126+
header, err := r.ReadString('\n')
127+
if err != nil {
128+
log.Fatal("unable to read the CSV headers", err.Error())
129+
}
130+
csvReader := csv.NewReader(strings.NewReader(header))
131+
fields, err := csvReader.Read()
132+
if err != nil {
133+
log.Fatal("unable to parse CSV headers: ", err.Error())
134+
}
135+
conf.csvHeaders = fields
136+
}
137+
// read IPs out of input
88138
for {
89139
line, err := r.ReadString('\n')
90140
if line != "" {
@@ -107,14 +157,28 @@ func AnnotateInputDecode(conf *GlobalConf, inChan <-chan string,
107157
outChan chan<- inProcessIP, wg *sync.WaitGroup, i int) {
108158
for line := range inChan {
109159
l := strings.TrimSuffix(line, "\n")
110-
if conf.InputFileType == "json" {
111-
val := jsonToInProcess(l, conf.JSONIPFieldName, conf.JSONAnnotationFieldName)
112-
if conf.JSONAnnotationFieldName != "" {
113-
val.Out[conf.JSONAnnotationFieldName] = make(map[string]interface{})
160+
switch conf.InputFileType {
161+
case "json":
162+
val := jsonToInProcess(l, conf.InputIPFieldName, conf.OutputAnnotationFieldName)
163+
if conf.OutputAnnotationFieldName != "" {
164+
val.Out[conf.OutputAnnotationFieldName] = make(map[string]interface{})
114165
}
115166
outChan <- val
116-
} else {
167+
case "csv":
168+
r := csv.NewReader(strings.NewReader(l))
169+
row, err := r.Read()
170+
if err != nil {
171+
log.Errorf("failed to parse CSV line (%s): %v", l, err)
172+
continue
173+
}
174+
val := csvToInProcess(row, conf.csvHeaders, conf.InputIPFieldName, conf.OutputAnnotationFieldName)
175+
if conf.OutputAnnotationFieldName != "" {
176+
val.Out[conf.OutputAnnotationFieldName] = make(map[string]interface{})
177+
}
178+
outChan <- val
179+
default:
117180
outChan <- ipToInProcess(l)
181+
118182
}
119183
}
120184
log.Debug("decode thread ", i, " done")
@@ -160,7 +224,7 @@ func AnnotateWrite(path string, out <-chan string, wg *sync.WaitGroup) {
160224
log.Debug("write thread finished")
161225
}
162226

163-
func AnnotateWorker(a Annotator, inChan <-chan inProcessIP,
227+
func AnnotateWorker(conf *GlobalConf, a Annotator, inChan <-chan inProcessIP,
164228
outChan chan<- inProcessIP, fieldName string, wg *sync.WaitGroup, i int) {
165229
name := a.GetFieldName()
166230
log.Debug("annotate worker (", name, ") ", i, " started")
@@ -169,7 +233,7 @@ func AnnotateWorker(a Annotator, inChan <-chan inProcessIP,
169233
log.Fatal("error initializing annotate worker: ", err)
170234
}
171235
for inProcess := range inChan {
172-
if fieldName != "" {
236+
if fieldName != "" && slices.Contains([]string{"json", "csv"}, conf.InputFileType) {
173237
p := inProcess.Out[fieldName].(map[string]interface{})
174238
p[name] = a.Annotate(inProcess.Ip)
175239
} else {
@@ -198,6 +262,7 @@ func DoAnnotation(conf *GlobalConf) {
198262
inDecoded := make(chan inProcessIP)
199263
// read input file
200264
go AnnotateRead(conf, conf.InputFilePath, inRaw)
265+
201266
// decode input data
202267
var decodeWG sync.WaitGroup
203268
for i := 0; i < conf.InputDecodeThreads; i++ {
@@ -213,8 +278,8 @@ func DoAnnotation(conf *GlobalConf) {
213278
if annotator.IsEnabled() {
214279
var annotateWG sync.WaitGroup
215280
for i := 0; i < annotator.GetWorkers(); i++ {
216-
go AnnotateWorker(annotator.MakeAnnotator(i), lastChannel, nextChannel,
217-
conf.JSONAnnotationFieldName, &annotateWG, i)
281+
go AnnotateWorker(conf, annotator.MakeAnnotator(i), lastChannel, nextChannel,
282+
conf.OutputAnnotationFieldName, &annotateWG, i)
218283
annotateWG.Add(1)
219284
}
220285
lastChannel = nextChannel

cmd/zannotate/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package main
1717
import (
1818
"flag"
1919
"os"
20+
"slices"
2021

2122
log "github.com/sirupsen/logrus"
2223

@@ -35,8 +36,8 @@ func main() {
3536
flags.StringVar(&conf.LogFilePath, "log-file", "", "where should JSON logs be saved")
3637
flags.IntVar(&conf.Verbosity, "verbosity", 3, "log verbosity: 1 (lowest)--5 (highest)")
3738
// json annotation configuration
38-
flags.StringVar(&conf.JSONIPFieldName, "json-ip-field", "ip", "key in JSON that contains IP address")
39-
flags.StringVar(&conf.JSONAnnotationFieldName, "json-annotation-field", "zannotate", "key that metadata is injected at")
39+
flags.StringVar(&conf.InputIPFieldName, "input-ip-field", "ip", "key in JSON or column in CSV that contains IP address")
40+
flags.StringVar(&conf.OutputAnnotationFieldName, "output-annotation-field", "zannotate", "key that metadata is injected at, used for both CSV and JSON file inputs to preserve data in the input file")
4041
// encode/decode threads
4142
flags.IntVar(&conf.InputDecodeThreads, "input-decode-threads", 3, "number of golang processes to decode input data (e.g., json)")
4243
flags.IntVar(&conf.OutputEncodeThreads, "output-encode-threads", 3, "number of golang processes to encode output data (e.g., json)")
@@ -71,7 +72,7 @@ func main() {
7172
default:
7273
log.Fatal("Unknown verbosity level specified. Must be between 1 (lowest)--5 (highest)")
7374
}
74-
if conf.InputFileType != "ips" && conf.InputFileType != "json" {
75+
if !slices.Contains([]string{"ips", "json", "csv"}, conf.InputFileType) {
7576
log.Fatal("invalid input file type")
7677
}
7778
// check if we have any annotations to be performed
@@ -87,7 +88,7 @@ func main() {
8788
}
8889
// perform sanity checks
8990
if conf.InputFileType == "ips" {
90-
conf.JSONAnnotationFieldName = ""
91+
conf.OutputAnnotationFieldName = ""
9192
}
9293

9394
// perform annotation

conf.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,19 @@ type BasePluginConf struct {
4545
// global library configuration
4646

4747
type GlobalConf struct {
48-
InputFilePath string
49-
InputFileType string
50-
OutputFilePath string
51-
MetadataFilePath string
52-
LogFilePath string
53-
Verbosity int
54-
Threads int
55-
JSONIPFieldName string
56-
JSONAnnotationFieldName string
57-
InputDecodeThreads int
58-
OutputEncodeThreads int
48+
InputFilePath string
49+
InputFileType string
50+
OutputFilePath string
51+
MetadataFilePath string
52+
LogFilePath string
53+
Verbosity int
54+
Threads int
55+
InputIPFieldName string
56+
OutputAnnotationFieldName string
57+
InputDecodeThreads int
58+
OutputEncodeThreads int
59+
60+
csvHeaders []string // Header row of a CSV file, used when parsing a CSV input
5961
}
6062

6163
var Annotators []AnnotatorFactory

requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
iniconfig==2.3.0
2+
packaging==26.0
3+
pluggy==1.6.0
4+
Pygments==2.20.0
5+
pytest==9.0.2

0 commit comments

Comments
 (0)