In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
Out[1]:
'C:\\spark'
In [3]:
import math
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit 
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
from tweet_parser.tweet import Tweet
from tweet_parser.tweet_parser_errors import NotATweetError
import fileinput
import json
In [4]:
spark = SparkSession.builder.appName('SparkSQL_Python_airport').getOrCreate()
In [6]:
data_path = 'data/OPI/'

DATA EXPLORATION

In [7]:
file_path = data_path + 'demo.csv'
df1 = spark.read.format('csv').option("header", "true").load(file_path)
df1.show()
+----------+--------------------+-----+----+----+--------------+--------------+------+
|airline_id|                name|alias|iata|icao|      callsign|       country|active|
+----------+--------------------+-----+----+----+--------------+--------------+------+
|        -1|             Unknown|   \N|   -| N/A|            \N|            \N|     Y|
|         1|      Private flight|   \N|   -| N/A|          null|          null|     Y|
|         2|         135 Airways|   \N|null| GNL|       GENERAL| United States|     N|
|         3|       1Time Airline|   \N|  1T| RNX|       NEXTIME|  South Africa|     Y|
|         4|2 Sqn No 1 Elemen...|   \N|null| WYT|          null|United Kingdom|     N|
|         5|     213 Flight Unit|   \N|null| TFU|          null|        Russia|     N|
|         6|223 Flight Unit S...|   \N|null| CHD|CHKALOVSK-AVIA|        Russia|     N|
|         7|   224th Flight Unit|   \N|null| TTF|    CARGO UNIT|        Russia|     N|
|         8|         247 Jet Ltd|   \N|null| TWF|  CLOUD RUNNER|United Kingdom|     N|
|         9|         3D Aviation|   \N|null| SEC|       SECUREX| United States|     N|
|        10|         40-Mile Air|   \N|  Q5| MLA|      MILE-AIR| United States|     Y|
|        11|              4D Air|   \N|null| QRT|       QUARTET|      Thailand|     N|
|        12|611897 Alberta Li...|   \N|null| THD|         DONUT|        Canada|     N|
|        13|    Ansett Australia|   \N|  AN| AAA|        ANSETT|     Australia|     Y|
|        14|Abacus International|   \N|  1B|null|          null|     Singapore|     Y|
|        15|     Abelag Aviation|   \N|  W9| AAB|           ABG|       Belgium|     N|
|        16|      Army Air Corps|   \N|null| AAC|       ARMYAIR|United Kingdom|     N|
|        17|Aero Aviation Cen...|   \N|null| AAD|       SUNRISE|        Canada|     N|
|        18|Aero Servicios Ej...|   \N|null| SII|        ASEISA|        Mexico|     N|
|        19|         Aero Biniza|   \N|null| BZS|        BINIZA|        Mexico|     N|
+----------+--------------------+-----+----+----+--------------+--------------+------+
only showing top 20 rows

The problem is asking to keep the same number of records after the inner join, but the table has many incorrect records, there is missing values in different columns, there is no cleareness in the goal of this exercise, if i do the dicctionary only with the contry name and the id country placed by me, then the inner join won't work because there is no country id columnd in order to do the join, and the problem did not identify if i could modify the structure of this dataset adding more columns, so for this reason i will use only the airline_id column as a id in the new csv generated, after that i will do the inner join using the airline_id column in order to complte the exercise.

In [8]:
dict_countries = df1.select('airline_id','country')
dict_countries.show()
# # AEROVARADERO # #ALL STAR# #ASA PESADA# # S# ## AVEMEX# LAP# AIRPAC# AIREX# ODINN# RENTAXEL# ASUR# ATCO# AVIANCA# Oman# UNIFORM OSCAR# ASTORIA
+----------+--------------+
|airline_id|       country|
+----------+--------------+
|        -1|            \N|
|         1|          null|
|         2| United States|
|         3|  South Africa|
|         4|United Kingdom|
|         5|        Russia|
|         6|        Russia|
|         7|        Russia|
|         8|United Kingdom|
|         9| United States|
|        10| United States|
|        11|      Thailand|
|        12|        Canada|
|        13|     Australia|
|        14|     Singapore|
|        15|       Belgium|
|        16|United Kingdom|
|        17|        Canada|
|        18|        Mexico|
|        19|        Mexico|
+----------+--------------+
only showing top 20 rows

Generating the new csv, we will call it "dict.csv"

In [9]:
file_path = data_path + 'dict.csv'
pandas_dict = dict_countries.toPandas()
pandas_dict.rename(columns={'airline_id': 'country_id'}, inplace=True)
pandas_dict.to_csv(file_path, header=True, mode='w', index = False)

Reading the csv generated

In [10]:
df2 = spark.read.format('csv').option("header", "true").load(file_path)
df2.show()
+----------+--------------+
|country_id|       country|
+----------+--------------+
|        -1|            \N|
|         1|          null|
|         2| United States|
|         3|  South Africa|
|         4|United Kingdom|
|         5|        Russia|
|         6|        Russia|
|         7|        Russia|
|         8|United Kingdom|
|         9| United States|
|        10| United States|
|        11|      Thailand|
|        12|        Canada|
|        13|     Australia|
|        14|     Singapore|
|        15|       Belgium|
|        16|United Kingdom|
|        17|        Canada|
|        18|        Mexico|
|        19|        Mexico|
+----------+--------------+
only showing top 20 rows

Creating two tables in order to do the inner join

In [11]:
df1.createOrReplaceTempView('demo')
df2.createOrReplaceTempView('dict')

df_sql = spark.sql("select d.*, di.*"
           " from demo d "           
           "inner join dict as di on d.airline_id = di.country_id ")
df_sql.show()
print(df_sql.count())
+----------+--------------------+-----+----+----+--------------+--------------+------+----------+--------------+
|airline_id|                name|alias|iata|icao|      callsign|       country|active|country_id|       country|
+----------+--------------------+-----+----+----+--------------+--------------+------+----------+--------------+
|        -1|             Unknown|   \N|   -| N/A|            \N|            \N|     Y|        -1|            \N|
|         1|      Private flight|   \N|   -| N/A|          null|          null|     Y|         1|          null|
|         2|         135 Airways|   \N|null| GNL|       GENERAL| United States|     N|         2| United States|
|         3|       1Time Airline|   \N|  1T| RNX|       NEXTIME|  South Africa|     Y|         3|  South Africa|
|         4|2 Sqn No 1 Elemen...|   \N|null| WYT|          null|United Kingdom|     N|         4|United Kingdom|
|         5|     213 Flight Unit|   \N|null| TFU|          null|        Russia|     N|         5|        Russia|
|         6|223 Flight Unit S...|   \N|null| CHD|CHKALOVSK-AVIA|        Russia|     N|         6|        Russia|
|         7|   224th Flight Unit|   \N|null| TTF|    CARGO UNIT|        Russia|     N|         7|        Russia|
|         8|         247 Jet Ltd|   \N|null| TWF|  CLOUD RUNNER|United Kingdom|     N|         8|United Kingdom|
|         9|         3D Aviation|   \N|null| SEC|       SECUREX| United States|     N|         9| United States|
|        10|         40-Mile Air|   \N|  Q5| MLA|      MILE-AIR| United States|     Y|        10| United States|
|        11|              4D Air|   \N|null| QRT|       QUARTET|      Thailand|     N|        11|      Thailand|
|        12|611897 Alberta Li...|   \N|null| THD|         DONUT|        Canada|     N|        12|        Canada|
|        13|    Ansett Australia|   \N|  AN| AAA|        ANSETT|     Australia|     Y|        13|     Australia|
|        14|Abacus International|   \N|  1B|null|          null|     Singapore|     Y|        14|     Singapore|
|        15|     Abelag Aviation|   \N|  W9| AAB|           ABG|       Belgium|     N|        15|       Belgium|
|        16|      Army Air Corps|   \N|null| AAC|       ARMYAIR|United Kingdom|     N|        16|United Kingdom|
|        17|Aero Aviation Cen...|   \N|null| AAD|       SUNRISE|        Canada|     N|        17|        Canada|
|        18|Aero Servicios Ej...|   \N|null| SII|        ASEISA|        Mexico|     N|        18|        Mexico|
|        19|         Aero Biniza|   \N|null| BZS|        BINIZA|        Mexico|     N|        19|        Mexico|
+----------+--------------------+-----+----+----+--------------+--------------+------+----------+--------------+
only showing top 20 rows

6162