In [ ]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
Out[ ]:
'C:\\spark'
In [ ]:
pip install tweet_parser
In [ ]:
import math
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
from tweet_parser.tweet import Tweet
from tweet_parser.tweet_parser_errors import NotATweetError
import fileinput
import json
In [ ]:
spark = SparkSession.builder.appName('SparkSQL_Python').getOrCreate()
In [ ]:
data_path = 'data/OPI/'

DATA EXPLORATION AND PREPARATION

I will separate in another file "tweets_geo.csv" all the different tweets with their geographic data information, this will help in the manipulation of this data in a query with sparkSQL

In [ ]:
file_path = data_path + 'tweets.json'
str_cor = '_id' + ',' + 'long' + ',' + 'lat'

with open(data_path + 'tweets_geo.csv', 'a',  newline='') as outfile:
    outfile.write(str_cor)
    outfile.write("\n")

with open(file_path) as f:
    for line in f:        
        try:
            tweet_dict = json.loads(line)
            tweet = Tweet(tweet_dict)          
            if(tweet['geo']  != None):
                coords = tweet['geo']['coordinates']
                str_cor = str(tweet['_id']['$oid']) + ',' + str(coords[0]) + ',' + str(coords[1])
                with open(data_path + 'tweets_geo.csv', 'a',  newline='') as outfile:
                    outfile.write(str_cor)
                    outfile.write("\n")         
            else:
                countn +=1
                continue         
        except (json.JSONDecodeError,NotATweetError):
            pass          

Checking the new file

In [ ]:
file_path = data_path + 'tweets_geo.csv'
df3 = spark.read.format('csv').option("header", "true").load(file_path)
df3.show()
+--------------------+---------+-----------+
|                 _id|     long|        lat|
+--------------------+---------+-----------+
|5473c7ebf555990ab...|34.043134|-117.464278|
|5473c7ebf555990ab...|32.755309| -97.099929|
|5473c7ebf555990ab...|32.713385| -97.081261|
|5473c7ebf555990ab...|35.369164|-119.110548|
|5473c7ebf555990ab...|36.189774|-115.068433|
|5473c7ebf555990ab...|33.805866|-117.921487|
|5473c7ebf555990ab...|30.181008| -97.777566|
|5473c7ebf555990ab...| 34.29341| -118.50683|
|5473c7ebf555990ab...|32.738588|-117.101484|
|5473c7ebf555990ab...|34.750892| -92.112252|
|5473c7ebf555990ab...|30.680976| -95.527534|
|5473c7ebf555990ab...|33.948667| -118.09165|
|5473c7ebf555990ab...|29.959288| -90.112532|
|5473c7ebf555990ab...|19.338004| -99.111639|
|5473c7ebf555990ab...|29.886629| -95.705032|
|5473c7ebf555990ab...|19.396404| -99.174546|
|5473c7ebf555990ab...|25.976383| -97.521357|
|5473c7ebf555990ab...|34.180936|-118.405688|
|5473c7ebf555990ab...|19.278723| -99.234307|
|5473c7ebf555990ab...| 34.19007|-118.397954|
+--------------------+---------+-----------+
only showing top 20 rows

CHOOSING THE TOOLS

Since python was the selected language to complete all of these exercises, tools related to the python software development stack will be used, but Node.js could be a fantastic tool to complete this exercise.

  • PyCharm: It is a software development environment for the Python language.
  • Flask: Flask is a popular, extensible web microframework for building web applications with Python.
In [ ]:
df3.createOrReplaceTempView('tweets_geo')
In [ ]:
orig_lat = -100.858301
orig_lon = 20.55256
dist = 0.621371

lon1 = orig_lon-dist/abs(math.cos(math.radians(orig_lat))*69);
lon2 = orig_lon+dist/abs(math.cos(math.radians(orig_lat))*69);
lat1 = orig_lat-(dist/69); 
lat2 = orig_lat+(dist/69);
In [ ]:
df3_sql = spark.sql(" select count(dss.distance) as counter "
                    "from ( " 
                    "SELECT 3956 * 2 * ASIN(SQRT(POWER(SIN((" + str(orig_lat) + "- (lat)) * pi()/180 / 2),2) + COS(" + str(orig_lat) + "* pi()/180 ) * COS((lat) *pi()/180) * POWER(SIN((" + str(orig_lon) + "- long) *pi()/180 / 2), 2) )) as distance "
                    "FROM tweets_geo "
                    "WHERE long between " + str(lon1) + " and " + str(lon2) + " and lat between " + str(lat1) + " and " + str(lat2) + " "        
                    "ORDER BY Distance ) dss "                    
                    " where dss.distance < " + str(dist) + " ")

df3_sql.first()['counter']
Out[ ]:
17