Skip to main content

spark-scala-python

 ############sparkcontest######33333

it is used in earlier spark 1.x

//scala 

import org.apache.spark.SparkConf

    import org.apache.spark.SparkContext

    val conf = new SparkConf().setAppName("first").setMaster("local[*]")

    val sc = new SparkContext(conf)

val rdd1 = sc.textFile("C:/workspace/data/txns")

# python 

from pyspark import SparkContext,SparkConf

    conf = SparkConf().setAppName("first").setMaster("local[*])

    sc = SparkContext(conf)

    

## now days sparksession are used 





########range#########

// in Scala

val myRange = spark.range(1000).toDF("number")

# in Python

myRange = spark.range(1000).toDF("number")


###########where##########

// in Scala

val divisBy2 = myRange.where("number % 2 = 0")

# in Python

divisBy2 = myRange.where("number % 2 = 0")


###########read csv ##########

// in Scala

val flightData2015 = spark

.read

.option("inferSchema", "true")

.option("header", "true")

.csv("/data/flight-data/csv/2015-summary.csv")


# in Python

flightData2015 = spark\

.read\

.option("inferSchema", "true")\

.option("header", "true")\

.csv("/data/flight-data/csv/2015-summary.csv")


##########sql tabel#########33

flightData2015.createOrReplaceTempView("flight_data_2015")


// in Scala

val sqlWay = spark.sql("""

SELECT DEST_COUNTRY_NAME, count(1)

FROM flight_data_2015

GROUP BY DEST_COUNTRY_NAME

""")

val dataFrameWay = flightData2015

.groupBy('DEST_COUNTRY_NAME)

.count()

sqlWay.explain

dataFrameWay.explain


# in Python

sqlWay = spark.sql("""

SELECT DEST_COUNTRY_NAME, count(1)

FROM flight_data_2015

GROUP BY DEST_COUNTRY_NAME

""")

dataFrameWay = flightData2015\

.groupBy("DEST_COUNTRY_NAME")\

.count()

sqlWay.explain()

dataFrameWay.explain()


###########max######################

spark.sql("SELECT max(count) from flight_data_2015").take(1)

// in Scala

import org.apache.spark.sql.functions.max

flightData2015.select(max("count")).take(1)


# in Python

from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)


############conunt ############

// in Scala

val maxSql = spark.sql("""

SELECT DEST_COUNTRY_NAME, sum(count) as destination_total

FROM flight_data_2015

GROUP BY DEST_COUNTRY_NAME

ORDER BY sum(count) DESC

LIMIT 5

""")

maxSql.show()

# in Python

maxSql = spark.sql("""

SELECT DEST_COUNTRY_NAME, sum(count) as destination_total

FROM flight_data_2015

GROUP BY DEST_COUNTRY_NAME

ORDER BY sum(count) DESC

LIMIT 5

""")

maxSql.show()


#############groupBy and sort ################

// in Scala

import org.apache.spark.sql.functions.desc

flightData2015

.groupBy("DEST_COUNTRY_NAME")

.sum("count")

.withColumnRenamed("sum(count)", "destination_total")

.sort(desc("destination_total"))

.limit(5)

.show()

# in Python

from pyspark.sql.functions import desc

flightData2015\

.groupBy("DEST_COUNTRY_NAME")\

.sum("count")\

.withColumnRenamed("sum(count)", "destination_total")\

.sort(desc("destination_total"))\

.limit(5)\

.show()


############dataset#################3

// in Scala

case class Flight(DEST_COUNTRY_NAME: String,

ORIGIN_COUNTRY_NAME: String,

count: BigInt)

val flightsDF = spark.read\

.parquet("/data/flight-data/parquet/2010-summary.parquet/")

val flights = flightsDF.as[Flight]

######################dataset fileter###############

// in Scala

flights

.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")

.map(flight_row => flight_row)

.take(5)

flights

.take(5)

.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")

.map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))


#################rdd to dAtaframe##########33

// in Scala

val df = spark.range(500).toDF("number")

df.select(df.col("number") + 10)

# in Python

df = spark.range(500).toDF("number")

df.select(df["number"] + 10)


############read file###############3


// in Scala

val df = spark.read.format("json")

.load("/data/flight-data/json/2015-summary.json")

# in Python

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

df.printSchema()


############define schma #########333

// in Scala

import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

import org.apache.spark.sql.types.Metadata

val myManualSchema = StructType(Array(

StructField("DEST_COUNTRY_NAME", StringType, true),

StructField("ORIGIN_COUNTRY_NAME", StringType, true),

StructField("count", LongType, false,

Metadata.fromJson("{\"hello\":\"world\"}"))

))

val df = spark.read.format("json").schema(myManualSchema)

.load("/data/flight-data/json/2015-summary.json")


# Here’s how to do the same in Python:

# in Python

from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([

StructField("DEST_COUNTRY_NAME", StringType(), True),

StructField("ORIGIN_COUNTRY_NAME", StringType(), True),

StructField("count", LongType(), False, metadata={"hello":"world"})

])

df = spark.read.format("json").schema(myManualSchema)\

.load("/data/flight-data/json/2015-summary.json")


// in Scala

import org.apache.spark.sql.functions.expr

expr("(((someCol + 5) * 200) - 6) < otherCol")

# in Python

from pyspark.sql.functions import expr

expr("(((someCol + 5) * 200) - 6) < otherCol")


#########columns#########333

spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

.columns


############creating row#############333

// in Scala

import org.apache.spark.sql.Row

val myRow = Row("Hello", null, 1, false)

# in Python

from pyspark.sql import Row

myRow = Row("Hello", None, 1, False)


// in Scala

myRow(0) // type Any

myRow(0).asInstanceOf[String] // String

myRow.getString(0) // String

myRow.getInt(2) // Int

# in Python

myRow[0]

myRow[2]


#################create dataframe###########

// in Scala

val df = spark.read.format("json")

.load("/data/flight-data/json/2015-summary.json")

df.createOrReplaceTempView("dfTable")


# in Python

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

df.createOrReplaceTempView("dfTable")



##########select column###########3

// in Scala

df.select("DEST_COUNTRY_NAME").show(2)

# in Python

df.select("DEST_COUNTRY_NAME").show(2)

-- in SQL

SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2


// in Scala

df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

# in Python

df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

-- in SQL

SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2


############3column and expression ######33333

// in Scala

import org.apache.spark.sql.functions.{expr, col, column}

df.select(

df.col("DEST_COUNTRY_NAME"),

col("DEST_COUNTRY_NAME"),

column("DEST_COUNTRY_NAME"),

'DEST_COUNTRY_NAME,

$"DEST_COUNTRY_NAME",

expr("DEST_COUNTRY_NAME"))

.show(2)


# in Python

from pyspark.sql.functions import expr, col, column

df.select(

expr("DEST_COUNTRY_NAME"),

col("DEST_COUNTRY_NAME"),

column("DEST_COUNTRY_NAME"))\

.show(2)


// in Scala

df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

# in Python

df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

-- in SQL

SELECT DEST_COUNTRY_NAME as destination FROM dfTable LIMIT 2


// in Scala

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

# in Python

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)


#################selectExpr############33333

// in Scala

df.selectExpr(

"*", // include all original columns

"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")

.show(2)

# in Python

df.selectExpr(

"*", # all original columns

"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\

.show(2)

-- in SQL

SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry

FROM dfTable

LIMIT 2


// in Scala

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

# in Python

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

-- in SQL

SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 2



###########3 lit  ###########333

// in Scala

import org.apache.spark.sql.functions.lit

df.select(expr("*"), lit(1).as("One")).show(2)

# in Python

from pyspark.sql.functions import lit

df.select(expr("*"), lit(1).alias("One")).show(2)

In SQL, literals are just the specific value:

-- in SQL

SELECT *, 1 as One FROM dfTable LIMIT 2


############# add column ###########3333


// in Scala

df.withColumn("numberOne", lit(1)).show(2)

# in Python

df.withColumn("numberOne", lit(1)).show(2)

-- in SQL

SELECT *, 1 as numberOne FROM dfTable LIMIT 2


########renaming column ###########33

// in Scala

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

# in Python

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

... dest, ORIGIN_COUNTRY_NAME, count


###########removing columns ###########33

df.drop("ORIGIN_COUNTRY_NAME").columns

We can drop multiple columns by passing in multiple columns as arguments:

dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")


#############3 where ########33333333

// in Scala

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")

.show(2)

# in Python

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\

.show(2)

-- in SQL

SELECT * FROM dfTable WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia"

LIMIT 2


#########3 distinct ############3333

// in Scala

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

# in Python

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

-- in SQL

SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable

Results in 256.

// in Scala

df.select("ORIGIN_COUNTRY_NAME").distinct().count()

# in Python

df.select("ORIGIN_COUNTRY_NAME").distinct().count()

-- in SQL

SELECT COUNT(DISTINCT ORIGIN_COUNTRY_NAME) FROM dfTable


######### random samples ###############3

val seed = 5

val withReplacement = false

val fraction = 0.5

df.sample(withReplacement, fraction, seed).count()

# in Python

seed = 5

withReplacement = False

fraction = 0.5

df.sample(withReplacement, fraction, seed).count()


########3 random split #########333

// in Scala

val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)

dataFrames(0).count() > dataFrames(1).count() // False

# in Python

dataFrames = df.randomSplit([0.25, 0.75], seed)

dataFrames[0].count() > dataFrames[1].count() # False



############ union ###########

// in Scala

import org.apache.spark.sql.Row

val schema = df.schema

val newRows = Seq(

Row("New Country", "Other Country", 5L),

Row("New Country 2", "Other Country 3", 1L)

)

val parallelizedRows = spark.sparkContext.parallelize(newRows)

val newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)

.where("count = 1")

.where($"ORIGIN_COUNTRY_NAME" =!= "United States")

.show() // get all of them and we'll see our new rows at the end

In Scala, you must use the =!= operator so that you don’t just compare the unevaluated column

expression to a string but instead to the evaluated one:

# in Python

from pyspark.sql import Row

schema = df.schema

newRows = [

Row("New Country", "Other Country", 5L),

Row("New Country 2", "Other Country 3", 1L)

]

parallelizedRows = spark.sparkContext.parallelize(newRows)

newDF = spark.createDataFrame(parallelizedRows, schema)

# in Python

df.union(newDF)\

.where("count = 1")\

.where(col("ORIGIN_COUNTRY_NAME") != "United States")\

.show()


###########sort############3333

// in Scala

df.sort("count").show(5)

df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

# in Python

df.sort("count").show(5)

df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

To more explicitly specify sort direction, you need to use the asc and desc functions if operating

on a column. These allow you to specify the order in which a given column should be sorted:

// in Scala

import org.apache.spark.sql.functions.{desc, asc}

df.orderBy(expr("count desc")).show(2)

df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)

# in Python

from pyspark.sql.functions import desc, asc

df.orderBy(expr("count desc")).show(2)

df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

-- in SQL

SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2


############ getNumPartition ###########33

// in Scala

df.rdd.getNumPartitions // 1

# in Python

df.rdd.getNumPartitions() # 1


########## repartition and coalesce##################


// in Scala

df.repartition(5)

# in Python

df.repartition(5)

If you know that you’re going to be filtering by a certain column often, it can be worth

repartitioning based on that column:

// in Scala

df.repartition(col("DEST_COUNTRY_NAME"))

# in Python

df.repartition(col("DEST_COUNTRY_NAME"))

You can optionally specify the number of partitions you would like, too:

// in Scala

df.repartition(5, col("DEST_COUNTRY_NAME"))

# in Python

df.repartition(5, col("DEST_COUNTRY_NAME"))

Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. This

operation will shuffle your data into five partitions based on the destination country name, and

then coalesce them (without a full shuffle):

// in Scala

df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

# in Python

df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)



##################limit#############3

// in scala 

// in Scala

val collectDF = df.limit(10)

collectDF.take(5) // take works with an Integer count

collectDF.show() // this prints it out nicely

collectDF.show(5, false)

collectDF.collect()


# in Python

collectDF = df.limit(10)

collectDF.take(5) # take works with an Integer count

collectDF.show() # this prints it out nicely

collectDF.show(5, False)

collectDF.collect()


################## toLocalIterator() ############

The method toLocalIterator collects partitions to the driver as an iterator. This

method allows you to iterate over the entire dataset partition-by-partition in a serial manner:

collectDF.toLocalIterator()


*****************************************************

############### DATA TYPE ##############33

##############lit #######################

// in Scala

import org.apache.spark.sql.functions.lit

df.select(lit(5), lit("five"), lit(5.0))

# in Python

from pyspark.sql.functions import lit

df.select(lit(5), lit("five"), lit(5.0))

There’s no equivalent function necessary in SQL, so we can use the values directly:

-- in SQL

SELECT 5, "five", 5.0


#########3 number #########3333

// in Scala

import org.apache.spark.sql.functions.{expr, pow}

val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5

df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)

# in Python

from pyspark.sql.functions import expr, pow

fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5

df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)


###########selectexpre


// in Scala

df.selectExpr(

"CustomerId",

"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)

# in Python

df.selectExpr(

"CustomerId",

"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)

-- in SQL

SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity

FROM dfTable


#########3 round ####################3


// in Scala

import org.apache.spark.sql.functions.{round, bround}

df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)

By default, the round function rounds up if you’re exactly in between two numbers. You can

round down by using the bround:

// in Scala

import org.apache.spark.sql.functions.lit

df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

# in Python

from pyspark.sql.functions import lit, round, bround

df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

-- in SQL

SELECT round(2.5), bround(2.5)


########## correlation of 2 columns #################

// in Scala

import org.apache.spark.sql.functions.{corr}

df.stat.corr("Quantity", "UnitPrice")

df.select(corr("Quantity", "UnitPrice")).show()

# in Python

from pyspark.sql.functions import corr

df.stat.corr("Quantity", "UnitPrice")

df.select(corr("Quantity", "UnitPrice")).show()

-- in SQL

SELECT corr(Quantity, UnitPrice) FROM dfTable


############ describe 


// in Scala

df.describe().show()

# in Python

df.describe().show()


################crosstab###########33

// in Scala

df.stat.crosstab("StockCode", "Quantity").show()

# in Python

df.stat.crosstab("StockCode", "Quantity").show()

// in Scala

df.stat.freqItems(Seq("StockCode", "Quantity")).show()

# in Python

df.stat.freqItems(["StockCode", "Quantity"]).show()


############## mnotonically_increasing_id ######33

// in Scala

import org.apache.spark.sql.functions.monotonically_increasing_id

df.select(monotonically_increasing_id()).show(2)

# in Python

from pyspark.sql.functions import monotonically_increasing_id

df.select(monotonically_increasing_id()).show(2)


################# string ###############


##################initcap ###########33

first letter capital of each word in string separted by space

// in Scala

import org.apache.spark.sql.functions.{initcap}

df.select(initcap(col("Description"))).show(2, false)

# in Python

from pyspark.sql.functions import initcap

df.select(initcap(col("Description"))).show()

-- in SQL

SELECT initcap(Description) FROM dfTable


--upper and lower 

import org.apache.spark.sql.functions.{lower, upper}

df.select(col("Description"),

lower(col("Description")),

upper(lower(col("Description")))).show(2)

# in Python

from pyspark.sql.functions import lower, upper

df.select(col("Description"),

lower(col("Description")),

upper(lower(col("Description")))).show(2)

-- in SQL

SELECT Description, lower(Description), Upper(lower(Description)) FROM dfTable


############### remove spaces ##########333

// in Scala

import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim}

df.select(

ltrim(lit(" HELLO ")).as("ltrim"),

rtrim(lit(" HELLO ")).as("rtrim"),

trim(lit(" HELLO ")).as("trim"),

lpad(lit("HELLO"), 3, " ").as("lp"),

rpad(lit("HELLO"), 10, " ").as("rp")).show(2)

# in Python

from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

df.select(

ltrim(lit(" HELLO ")).alias("ltrim"),

rtrim(lit(" HELLO ")).alias("rtrim"),

trim(lit(" HELLO ")).alias("trim"),

lpad(lit("HELLO"), 3, " ").alias("lp"),

rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

-- in SQL

SELECT

ltrim(' HELLLOOOO '),

rtrim(' HELLLOOOO '),

trim(' HELLLOOOO '),

lpad('HELLOOOO ', 3, ' '),

rpad('HELLOOOO ', 10, ' ')

FROM dfTable


#################Regular Expression #############3333

##############replacing string with ############3

// in Scala

import org.apache.spark.sql.functions.regexp_replace

val simpleColors = Seq("black", "white", "red", "green", "blue")

val regexString = simpleColors.map(_.toUpperCase).mkString("|")

// the | signifies `OR` in regular expression syntax

df.select(

regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),

col("Description")).show(2)

# in Python

from pyspark.sql.functions import regexp_replace

regex_string = "BLACK|WHITE|RED|GREEN|BLUE"

df.select(

regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),

col("Description")).show(2)

-- in SQL

SELECT

regexp_replace(Description, 'BLACK|WHITE|RED|GREEN|BLUE', 'COLOR') as

color_clean, Description

FROM dfTable


-------------replace charecter with ---------

// in Scala

import org.apache.spark.sql.functions.translate

df.select(translate(col("Description"), "LEET", "1337"), col("Description"))

.show(2)

# in Python

from pyspark.sql.functions import translate

df.select(translate(col("Description"), "LEET", "1337"),col("Description"))\

.show(2)

-- in SQL

SELECT translate(Description, 'LEET', '1337'), Description FROM dfTable


-----extracting string from column -------

// in Scala

import org.apache.spark.sql.functions.regexp_extract

val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")

// the | signifies OR in regular expression syntax

df.select(

regexp_extract(col("Description"), regexString, 1).alias("color_clean"),

col("Description")).show(2)

# in Python

from pyspark.sql.functions import regexp_extract

extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"

df.select(

regexp_extract(col("Description"), extract_str, 1).alias("color_clean"),

col("Description")).show(2)

-- in SQL

SELECT regexp_extract(Description, '(BLACK|WHITE|RED|GREEN|BLUE)', 1),

Description

FROM dfTable


-----------existance of string--------

// in Scala

val containsBlack = col("Description").contains("BLACK")

val containsWhite = col("DESCRIPTION").contains("WHITE")

df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))

.where("hasSimpleColor")

.select("Description").show(3, false)

In Python and SQL, we can use the instr function:

# in Python

from pyspark.sql.functions import instr

containsBlack = instr(col("Description"), "BLACK") >= 1

containsWhite = instr(col("Description"), "WHITE") >= 1

df.withColumn("hasSimpleColor", containsBlack | containsWhite)\

.where("hasSimpleColor")\

.select("Description").show(3, False)

-- in SQL

SELECT Description FROM dfTable

WHERE instr(Description, 'BLACK') >= 1 OR instr(Description, 'WHITE') >= 1


###################select arbitrary columns #########3

// in Scala

val simpleColors = Seq("black", "white", "red", "green", "blue")

val selectedColumns = simpleColors.map(color => {

col("Description").contains(color.toUpperCase).alias(s"is_$color")

}):+expr("*") // could also append this value

df.select(selectedColumns:_*).where(col("is_white").or(col("is_red")))

.select("Description").show(3, false)


# in Python

from pyspark.sql.functions import expr, locate

simpleColors = ["black", "white", "red", "green", "blue"]

def color_locator(column, color_string):

return locate(color_string.upper(), column)\

.cast("boolean")\

.alias("is_" + c)

selectedColumns = [color_locator(df.Description, c) for c in simpleColors]

selectedColumns.append(expr("*")) # has to a be Column type

df.select(*selectedColumns).where(expr("is_white OR is_red"))\

.select("Description").show(3, False)


#######################date and times ##########3

// in Scala 

import org.apache.spark.sql.functions.{current_date, current_timestamp}

val dateDF = spark.range(10)

.withColumn("today", current_date())

.withColumn("now", current_timestamp())

dateDF.createOrReplaceTempView("dateTable")


# in Python

from pyspark.sql.functions import current_date, current_timestamp

dateDF = spark.range(10)\

.withColumn("today", current_date())\

.withColumn("now", current_timestamp())

dateDF.createOrReplaceTempView("dateTable")

dateDF.printSchema()


#############add date and sub date by days  ##########3

// in Scala

import org.apache.spark.sql.functions.{date_add, date_sub}

dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

# in Python

from pyspark.sql.functions import date_add, date_sub

dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

-- in SQL

SELECT date_sub(today, 5), date_add(today, 5) FROM dateTable


################ datediff,monthdiff,todate ##########

// in Scala

import org.apache.spark.sql.functions.{datediff, months_between, to_date}

dateDF.withColumn("week_ago", date_sub(col("today"), 7))

.select(datediff(col("week_ago"), col("today"))).show(1)

dateDF.select(

to_date(lit("2016-01-01")).alias("start"),

to_date(lit("2017-05-22")).alias("end"))

.select(months_between(col("start"), col("end"))).show(1)


# in Python

from pyspark.sql.functions import datediff, months_between, to_date

dateDF.withColumn("week_ago", date_sub(col("today"), 7))\

.select(datediff(col("week_ago"), col("today"))).show(1)

dateDF.select(

to_date(lit("2016-01-01")).alias("start"),

to_date(lit("2017-05-22")).alias("end"))\

.select(months_between(col("start"), col("end"))).show(1)

-- in SQL

SELECT to_date('2016-01-01'), months_between('2016-01-01', '2017-01-01'),

datediff('2016-01-01', '2017-01-01')

FROM dateTable


####### strign to date ###########33

// in Scala

import org.apache.spark.sql.functions.{to_date, lit}

spark.range(5).withColumn("date", lit("2017-01-01"))

.select(to_date(col("date"))).show(1)

# in Python

from pyspark.sql.functions import to_date, lit

spark.range(5).withColumn("date", lit("2017-01-01"))\

.select(to_date(col("date"))).show(1)


######## set date format ##########

// in Scala

import org.apache.spark.sql.functions.to_date

val dateFormat = "yyyy-dd-MM"

val cleanDateDF = spark.range(1).select(

to_date(lit("2017-12-11"), dateFormat).alias("date"),

to_date(lit("2017-20-12"), dateFormat).alias("date2"))

cleanDateDF.createOrReplaceTempView("dateTable2")

# in Python

from pyspark.sql.functions import to_date

dateFormat = "yyyy-dd-MM"

cleanDateDF = spark.range(1).select(

to_date(lit("2017-12-11"), dateFormat).alias("date"),

to_date(lit("2017-20-12"), dateFormat).alias("date2"))

cleanDateDF.createOrReplaceTempView("dateTable2")

-- in SQL

SELECT to_date(date, 'yyyy-dd-MM'), to_date(date2, 'yyyy-dd-MM'), to_date(date)

FROM dateTable2


###########3timestamp ###########3

// in Scala

import org.apache.spark.sql.functions.to_timestamp

cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

# in Python

from pyspark.sql.functions import to_timestamp

cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

-- in SQL

SELECT to_timestamp(date, 'yyyy-dd-MM'), to_timestamp(date2, 'yyyy-dd-MM')

FROM dateTable2



################# NULL DATA #################

###########coalesce #############

// in Scala

import org.apache.spark.sql.functions.coalesce

df.select(coalesce(col("Description"), col("CustomerId"))).show()

# in Python

from pyspark.sql.functions import coalesce

df.select(coalesce(col("Description"), col("CustomerId"))).show()


###########ifnull, nullIf, nvl, and nvl2########



###############drop ##########3333

df.na.drop()

df.na.drop("any")


--In SQL, we have to do this column by column:

-- in SQL

SELECT * FROM dfTable WHERE Description IS NOT NULL


-- to drop if all value null 

df.na.drop("all")


################3fill#############33

--na.fill()

df.na.fill("All Null values become this string")


// in Scala

df.na.fill(5, Seq("StockCode", "InvoiceNo"))

# in Python

df.na.fill("all", subset=["StockCode", "InvoiceNo"])



// in Scala

val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value")

df.na.fill(fillColValues)

# in Python

fill_cols_vals = {"StockCode": 5, "Description" : "No Value"}

df.na.fill(fill_cols_vals)


################replace #####################

// in Scala

df.na.replace("Description", Map("" -> "UNKNOWN"))

# in Python

df.na.replace([""], ["UNKNOWN"], "Description")



#################struct ################

df.selectExpr("(Description, InvoiceNo) as complex", "*")

df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

// in Scala

import org.apache.spark.sql.functions.struct

val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))

complexDF.createOrReplaceTempView("complexDF")

# in Python

from pyspark.sql.functions import struct

complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))

complexDF.createOrReplaceTempView("complexDF")

We now have a DataFrame with a column complex. We can query it just as we might another

DataFrame, the only difference is that we use a dot syntax to do so, or the column method

getField:

complexDF.select("complex.Description")

complexDF.select(col("complex").getField("Description"))

We can also query all values in the struct by using *. This brings up all the columns to the toplevel

DataFrame:

complexDF.select("complex.*")

-- in SQL

SELECT complex.* FROM complexDF


############# split into array #################

// in Scala

import org.apache.spark.sql.functions.split

df.select(split(col("Description"), " ")).show(2)

# in Python

from pyspark.sql.functions import split

df.select(split(col("Description"), " ")).show(2)

-- in SQL

SELECT split(Description, ' ') FROM dfTable


// in Scala

df.select(split(col("Description"), " ").alias("array_col"))

.selectExpr("array_col[0]").show(2)

# in Python

df.select(split(col("Description"), " ").alias("array_col"))\

.selectExpr("array_col[0]").show(2)

-- in SQL

SELECT split(Description, ' ')[0] FROM dfTable


###########legnth array ##############

// in Scala

import org.apache.spark.sql.functions.size

df.select(size(split(col("Description"), " "))).show(2) // shows 5 and 3

# in Python

from pyspark.sql.functions import size

df.select(size(split(col("Description"), " "))).show(2) # shows 5 and 3


############ array contains #############

We can also see whether this array contains a value:

// in Scala

import org.apache.spark.sql.functions.array_contains

df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

# in Python

from pyspark.sql.functions import array_contains

df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

-- in SQL

SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable


############### explode ###########

The explode function takes a column that consists of arrays and creates one row (with the rest of

the values duplicated) per value in the array. Figure 6-1 illustrates the process.

Figure 6-1. Exploding a column of text

// in Scala

import org.apache.spark.sql.functions.{split, explode}

df.withColumn("splitted", split(col("Description"), " "))

.withColumn("exploded", explode(col("splitted")))

.select("Description", "InvoiceNo", "exploded").show(2)


# in Python

from pyspark.sql.functions import split, explode

df.withColumn("splitted", split(col("Description"), " "))\

.withColumn("exploded", explode(col("splitted")))\

.select("Description", "InvoiceNo", "exploded").show(2)

-- in SQL

SELECT Description, InvoiceNo, exploded

FROM (SELECT *, split(Description, " ") as splitted FROM dfTable)

LATERAL VIEW explode(splitted) as exploded


############ Maps #############

Maps are created by using the map function and key-value pairs of columns. You then can select

them just like you might select from an array:

// in Scala

import org.apache.spark.sql.functions.map

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2)

# in Python

from pyspark.sql.functions import create_map

df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\

.show(2)

-- in SQL

SELECT map(Description, InvoiceNo) as complex_map FROM dfTable

WHERE Description IS NOT NULL


############# 

// in Scala

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))

.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)

# in Python

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\

.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)



############ json ###########3333333


// in Scala

import org.apache.spark.sql.functions.{get_json_object, json_tuple}

jsonDF.select(

get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",

json_tuple(col("jsonString"), "myJSONKey")).show(2)

# in Python

from pyspark.sql.functions import get_json_object, json_tuple

jsonDF.select(

get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",

json_tuple(col("jsonString"), "myJSONKey")).show(2)


Here’s the equivalent in SQL:

jsonDF.selectExpr(

"json_tuple(jsonString, '$.myJSONKey.myJSONValue[1]') as column").show(2)


#############3333 to json ##########

// in Scala

import org.apache.spark.sql.functions.to_json

df.selectExpr("(InvoiceNo, Description) as myStruct")

.select(to_json(col("myStruct")))

# in Python

from pyspark.sql.functions import to_json

df.selectExpr("(InvoiceNo, Description) as myStruct")\

.select(to_json(col("myStruct")))



################

// in Scala

import org.apache.spark.sql.functions.from_json

import org.apache.spark.sql.types._

val parseSchema = new StructType(Array(

new StructField("InvoiceNo",StringType,true),

new StructField("Description",StringType,true)))

df.selectExpr("(InvoiceNo, Description) as myStruct")

.select(to_json(col("myStruct")).alias("newJSON"))

.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)

# in Python

from pyspark.sql.functions import from_json

from pyspark.sql.types import *

parseSchema = StructType((

StructField("InvoiceNo",StringType(),True),

StructField("Description",StringType(),True)))

df.selectExpr("(InvoiceNo, Description) as myStruct")\

.select(to_json(col("myStruct")).alias("newJSON"))\

.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)


################### UDF ##############3

// in Scala

val udfExampleDF = spark.range(5).toDF("num")

def power3(number:Double):Double = number * number * number

power3(2.0)


# in Python

udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):

return double_value ** 3

power3(2.0)



// in Scala

import org.apache.spark.sql.functions.udf

val power3udf = udf(power3(_:Double):Double)

We can use that just like any other DataFrame function:

// in Scala

udfExampleDF.select(power3udf(col("num"))).show()


# in Python

from pyspark.sql.functions import udf

power3udf = udf(power3)

Then, we can use it in our DataFrame code:

# in Python

from pyspark.sql.functions import col

udfExampleDF.select(power3udf(col("num"))).show(2)


######## register function ###########

// in Scala

spark.udf.register("power3", power3(_:Double):Double)

udfExampleDF.selectExpr("power3(num)").show(2)


# in Python

udfExampleDF.selectExpr("power3(num)").show(2)

# registered in Scala


# in Python

from pyspark.sql.types import IntegerType, DoubleType

spark.udf.register("power3py", power3, DoubleType())

# in Python

udfExampleDF.selectExpr("power3py(num)").show(2)

# registered via Python


-- in SQL

SELECT power3(12), power3py(12) -- doesn't work because of return type

-- in SQL

CREATE TEMPORARY FUNCTION myFunc AS 'com.organization.hive.udf.FunctionName'

***blog 

Popular posts from this blog

Bagging and Boosting

  What is an Ensemble Method? The ensemble is a method used in the machine learning algorithm. In this method, multiple models or ‘weak learners’ are trained to rectify the same problem and integrated to gain desired results. Weak models combined rightly give accurate models. Bagging Bagging is an acronym for ‘Bootstrap Aggregation’ and is used to decrease the variance in the prediction model. Bagging is a parallel method that fits different, considered learners independently from each other, making it possible to train them simultaneously. Bagging generates additional data for training from the dataset. This is achieved by random sampling with replacement from the original dataset. Sampling with replacement may repeat some observations in each new training data set. Every element in Bagging is equally probable for appearing in a new dataset.  These multi datasets are used to train multiple models in parallel. The average of all the predictions from different ensemble models i...