import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
import math
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
from tweet_parser.tweet import Tweet
from tweet_parser.tweet_parser_errors import NotATweetError
import fileinput
import json
spark = SparkSession.builder.appName('SparkSQL_Python_airport').getOrCreate()
data_path = 'data/OPI/'
file_path = data_path + 'demo.csv'
df1 = spark.read.format('csv').option("header", "true").load(file_path)
df1.show()
The problem is asking to keep the same number of records after the inner join, but the table has many incorrect records, there is missing values in different columns, there is no cleareness in the goal of this exercise, if i do the dicctionary only with the contry name and the id country placed by me, then the inner join won't work because there is no country id columnd in order to do the join, and the problem did not identify if i could modify the structure of this dataset adding more columns, so for this reason i will use only the airline_id column as a id in the new csv generated, after that i will do the inner join using the airline_id column in order to complte the exercise.
dict_countries = df1.select('airline_id','country')
dict_countries.show()
# # AEROVARADERO # #ALL STAR# #ASA PESADA# # S# ## AVEMEX# LAP# AIRPAC# AIREX# ODINN# RENTAXEL# ASUR# ATCO# AVIANCA# Oman# UNIFORM OSCAR# ASTORIA
Generating the new csv, we will call it "dict.csv"
file_path = data_path + 'dict.csv'
pandas_dict = dict_countries.toPandas()
pandas_dict.rename(columns={'airline_id': 'country_id'}, inplace=True)
pandas_dict.to_csv(file_path, header=True, mode='w', index = False)
Reading the csv generated
df2 = spark.read.format('csv').option("header", "true").load(file_path)
df2.show()
Creating two tables in order to do the inner join
df1.createOrReplaceTempView('demo')
df2.createOrReplaceTempView('dict')
df_sql = spark.sql("select d.*, di.*"
" from demo d "
"inner join dict as di on d.airline_id = di.country_id ")
df_sql.show()
print(df_sql.count())