-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathextract-comments.py
More file actions
39 lines (33 loc) · 1.51 KB
/
extract-comments.py
File metadata and controls
39 lines (33 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, filter, size, transform
spark_dir = "/data/tmp/"
spark = SparkSession.builder.config("spark.worker.cleanup.enabled", "true").config("spark.local.dir", spark_dir).config("spark.driver.memory", "8G").config("spark.executor.cores", 10).master("local[16]").appName('spark-stats').getOrCreate()
df = spark.read.json("/data/comments-*.jsonl")
df2 = df.select(["data.repository.issues.pageInfo.hasNextPage", explode("data.repository.issues.edges").alias("issue")])
df3 = df2.select([
col("issue.node.number").alias("issue_no"),
col("issue.node.databaseId").alias("issue_id"),
col("issue.node.createdAt").alias("issue_created_at"),
col("issue.node.comments.pageInfo.hasNextPage").alias("has_more_comments"),
col("issue.node.comments.pageInfo.endCursor").alias("next_comments_cursor"),
explode("issue.node.comments.nodes").alias("comment")
])
def filter_reactions(x):
return x.reactors.totalCount > 0
def transform_reactions(x):
print(x)
return {x.content: x.reactors.totalCount}
df4 = df3.select([
"issue_no",
"issue_id",
"issue_created_at",
"has_more_comments",
"next_comments_cursor",
"comment.databaseId",
"comment.authorAssociation",
col("comment.author.login").alias("comment_author"),
col("comment.body").alias("comment_body"),
filter("comment.reactionGroups", filter_reactions)
.alias("reaction_groups")
]).dropDuplicates(["databaseId"])
df4.write.parquet("/data/comments")