import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
import math
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName('SparkSQL_Python_Profeco').getOrCreate()
data_path = 'data/OPI/'
file_path = data_path + 'all_data.csv'
df = spark.read.format('csv').option("header", "true").load(file_path)
df.show(10)
df.columns
df.count()
df = df.drop_duplicates()
df.count()
df = df.na.drop()
df.count()
df = df.filter(df['estado'] !='estado')
df.count()
There is records 59919903 Valid records
df.createOrReplaceTempView('profeco')
spark.sql("Select distinct categoria "
"from profeco").count()
spark.sql("Select distinct cadenaComercial "
"from profeco").count()
df_sql = spark.sql("select "
"z.estado, "
"z.producto, "
"z.cantidad "
"from ( "
" select "
" estado, "
" max(cantidad) as top_cantidad "
" from "
" ( "
" select "
" estado, "
"producto, "
"count(*) as cantidad "
"from "
"profeco "
"group by "
"estado, "
"producto "
") x "
"group by "
" estado "
") y "
"join "
"( "
"select "
"estado, "
"producto, "
"count(*) as cantidad "
"from "
"profeco "
"group by "
"estado, "
"producto "
") z "
"on "
"y.estado = z.estado and "
"y.top_cantidad = z.cantidad")
df_sql.show(100)
df_sql = spark.sql("select cadenaComercial, count(DISTINCT producto) as variedad "
" from profeco "
"group by cadenaComercial "
"order by variedad desc "
"LIMIT 1 ")
df_sql.show()