import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
pip install tweet_parser
import math
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
from tweet_parser.tweet import Tweet
from tweet_parser.tweet_parser_errors import NotATweetError
import fileinput
import json
spark = SparkSession.builder.appName('SparkSQL_Python').getOrCreate()
data_path = 'data/OPI/'
I will separate in another file "tweets_geo.csv" all the different tweets with their geographic data information, this will help in the manipulation of this data in a query with sparkSQL
file_path = data_path + 'tweets.json'
str_cor = '_id' + ',' + 'long' + ',' + 'lat'
with open(data_path + 'tweets_geo.csv', 'a', newline='') as outfile:
outfile.write(str_cor)
outfile.write("\n")
with open(file_path) as f:
for line in f:
try:
tweet_dict = json.loads(line)
tweet = Tweet(tweet_dict)
if(tweet['geo'] != None):
coords = tweet['geo']['coordinates']
str_cor = str(tweet['_id']['$oid']) + ',' + str(coords[0]) + ',' + str(coords[1])
with open(data_path + 'tweets_geo.csv', 'a', newline='') as outfile:
outfile.write(str_cor)
outfile.write("\n")
else:
countn +=1
continue
except (json.JSONDecodeError,NotATweetError):
pass
Checking the new file
file_path = data_path + 'tweets_geo.csv'
df3 = spark.read.format('csv').option("header", "true").load(file_path)
df3.show()
Since python was the selected language to complete all of these exercises, tools related to the python software development stack will be used, but Node.js could be a fantastic tool to complete this exercise.
df3.createOrReplaceTempView('tweets_geo')
orig_lat = -100.858301
orig_lon = 20.55256
dist = 0.621371
lon1 = orig_lon-dist/abs(math.cos(math.radians(orig_lat))*69);
lon2 = orig_lon+dist/abs(math.cos(math.radians(orig_lat))*69);
lat1 = orig_lat-(dist/69);
lat2 = orig_lat+(dist/69);
df3_sql = spark.sql(" select count(dss.distance) as counter "
"from ( "
"SELECT 3956 * 2 * ASIN(SQRT(POWER(SIN((" + str(orig_lat) + "- (lat)) * pi()/180 / 2),2) + COS(" + str(orig_lat) + "* pi()/180 ) * COS((lat) *pi()/180) * POWER(SIN((" + str(orig_lon) + "- long) *pi()/180 / 2), 2) )) as distance "
"FROM tweets_geo "
"WHERE long between " + str(lon1) + " and " + str(lon2) + " and lat between " + str(lat1) + " and " + str(lat2) + " "
"ORDER BY Distance ) dss "
" where dss.distance < " + str(dist) + " ")
df3_sql.first()['counter']