"Spark"의 두 판 사이의 차이

오픈소스 비즈니스 컨설팅
둘러보기로 가기 검색하러 가기
 
(같은 사용자의 중간 판 41개는 보이지 않습니다)
14번째 줄: 14번째 줄:
  
 
== Spark 구성 ==
 
== Spark 구성 ==
 +
=== Scala 설치 ===
 +
cd  /install
 +
wget  https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.tgz
 +
 +
cd  /appl
 +
tar  -xvzf  /install/scala-2.12.3.tgz
 +
mv  scala-2.12.3.tgz  scala
 +
# export PATH=${PATH}:/appl/scala/bin
 +
 
=== Spark 설치 ===
 
=== Spark 설치 ===
  
 
Spark 설치
 
Spark 설치
cd  /
 
mkdir  install
 
mkdir  appl
 
 
 
  cd  /install
 
  cd  /install
 
  wget  http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
 
  wget  http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
32번째 줄: 37번째 줄:
 
  cp  spark-env.sh.template      spark-env.sh
 
  cp  spark-env.sh.template      spark-env.sh
 
  cp  log4j.properties.template  log4j.properties
 
  cp  log4j.properties.template  log4j.properties
 +
 
  vi  spark-env.sh
 
  vi  spark-env.sh
 +
 
  vi  log4j.properties
 
  vi  log4j.properties
 
     log4j.rootCategory=WARN, console
 
     log4j.rootCategory=WARN, console
40번째 줄: 47번째 줄:
 
  # sbin/start-slave.sh  spark://localhost:7077
 
  # sbin/start-slave.sh  spark://localhost:7077
 
  # bin/pyspark  -master  spark://localhost:7077
 
  # bin/pyspark  -master  spark://localhost:7077
 +
 +
cd  /appl/spark
 +
sbin/start-all.sh
 +
bin/pyspark
 +
# bin/spark-shell
  
Scala 설치
+
* Pyspark : http://localhost:4040/
cd  /install
+
* Spark : http://localhost:8080/
wget  https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.tgz
 
  cd  /appl
 
tar  -xvzf  /install/scala-2.12.3.tgz
 
mv  scala-2.12.3.tgz  scala
 
# export PATH=${PATH}:/appl/scala/bin
 
  
 
=== 폴더 구성 ===
 
=== 폴더 구성 ===
109번째 줄: 116번째 줄:
 
* Spark                          : http://localhost:8080/
 
* Spark                          : http://localhost:8080/
  
=== pyspark 실행 ===
+
=== DataFrame으로 로드 ===
 +
 
 +
RDD (Resilient Distributed Dataset)
 +
 
 +
JSON 파일 로드
 +
df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
 +
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
 +
 
 +
Text 파일 로드
 +
from pyspark.sql import Row
 +
 +
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
 +
parts = lines.map(lambda l: l.split(","))
 +
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
 +
df = sqlContext.createDataFrame(people)
 +
 
 +
Text 파일 로드 with Schema 지정
 +
from pyspark.sql.types import *
 +
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
 +
 +
parts = lines.map(lambda l: l.split(","))
 +
people = parts.map(lambda p: (p[0], p[1].strip()))
 +
 +
schemaString = "name age"
 +
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
 +
schema = StructType(fields)
 +
 +
schemaPeople = sqlContext.createDataFrame(people, schema)
 +
 
 +
Parquet 데이터 로드
 +
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
 +
 
 +
=== DataFrame을 저장 ===
 +
df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet")
 +
# df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet")
 +
 +
df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")
 +
 
 +
=== DataFrame을 TempTable로 지정 ===
 +
df.registerTempTable("people")
 +
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 +
teenagers.show()
 +
 
 +
=== pyspark Sample 1 ===
  
Sample 1
 
 
  help(sqlContext)
 
  help(sqlContext)
     q                              #--- q
+
     q                               
 
   
 
   
 
  df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
 
  df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
124번째 줄: 173번째 줄:
 
  quit()
 
  quit()
  
Sample 2
+
=== pyspark Sample 2 ===
 +
 
 
  from pyspark.sql import Row
 
  from pyspark.sql import Row
 +
 
  lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")  
 
  lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")  
 
 
  parts = lines.map(lambda l: l.split(","))
 
  parts = lines.map(lambda l: l.split(","))
 
  people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
 
  people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
136번째 줄: 186번째 줄:
 
  teenagers.show()
 
  teenagers.show()
 
   
 
   
  teenNames = teenagers.map(lambda p: "Name: " + p.name)
+
  # teenNames = teenagers.map(lambda p: "Name: " + p.name)
  for teenName in teenNames.collect():
+
  # for teenName in teenNames.collect():
    print(teenName)
+
#    print(teenName)
 
  quit()
 
  quit()
  
Sample 3
+
=== pyspark Sample 3 ===
 +
 
 
  from pyspark.sql.types import *
 
  from pyspark.sql.types import *
 +
 
  lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
 
  lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
 
 
  parts = lines.map(lambda l: l.split(","))
 
  parts = lines.map(lambda l: l.split(","))
 
  people = parts.map(lambda p: (p[0], p[1].strip()))
 
  people = parts.map(lambda p: (p[0], p[1].strip()))
151번째 줄: 202번째 줄:
 
  fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
 
  fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
 
  schema = StructType(fields)
 
  schema = StructType(fields)
 
 
  schemaPeople = sqlContext.createDataFrame(people, schema)
 
  schemaPeople = sqlContext.createDataFrame(people, schema)
 
   
 
   
158번째 줄: 208번째 줄:
 
  teenagers.show()
 
  teenagers.show()
 
   
 
   
  teenNames = teenagers.map(lambda p: "Name: " + p.name)
+
  # teenNames = teenagers.map(lambda p: "Name: " + p.name)
  for teenName in teenNames.collect():
+
  # for teenName in teenNames.collect():
    print(teenName)
+
#    print(teenName)
 
  quit()
 
  quit()
 +
 +
=== pyspark Sample 4 ===
 +
 +
#--- 데이터 로드/저장
 +
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
 +
#--- 저장시 /home/eduuser/namesAndFavColors.parquet/ 폴더가 생성 됩니다.
 +
df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet")
 +
df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet")
 +
 +
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
 +
df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")
 +
 +
=== DataSet 다운로드 ===
 +
 +
cd  ~
 +
cd  nia_kbig
 +
[eduuser@localhost nia_kbig]$ ./datasetDownload.sh
 +
다운로드받을 데이터셋 코드를 입력하세요.
 +
    8h7k4
 +
    ka988
 +
    z24nt
 +
 +
cd  view/basic
 +
ls  -alF
 +
    -rwxr-xr-x. 1 eduuser eduuser  216 2014-12-14 08:46 01.move_data_file.sh*
 +
    -rwxr-xr-x. 1 eduuser eduuser  283 2014-12-04 22:22 01.move_data_file.sh~*
 +
    -rwxr-xr-x. 1 eduuser eduuser  270 2014-12-12 16:20 jeju_2010.csv*
 +
    -rwxr-xr-x. 1 eduuser eduuser  273 2014-12-12 16:16 jeju_2011.csv*
 +
    -rwxr-xr-x. 1 eduuser eduuser  278 2014-12-12 16:16 jeju_2012.csv*
 +
    -rwxr-xr-x. 1 eduuser eduuser 1476 2015-01-11 22:59 view_basic_analysis.r*
 +
 +
# hdfs dfs -mkdir /user
 +
# hdfs dfs -mkdir /user/eduuser/
 +
# hdfs dfs -put jeju* /user/eduuser/
 +
 +
=== pyspark Sample 5 ===
 +
from pyspark.sql import Row
 +
 +
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv')
 +
parts = lines.map(lambda l: l.split(','))
 +
jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2]))
 +
schema2010 = sqlContext.createDataFrame(jeju2010)
 +
 
 +
schema2010.registerTempTable("jeju2010")
 +
schema2010.show()
 +
 +
sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'").show()
 +
jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
 +
jenu2010.show()
 +
 +
=== pyspark Sample 6 ===
 +
 +
from pyspark.sql.types import *
 +
 +
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv')
 +
parts = lines.map(lambda l: l.split(','))
 +
jeju2011 = parts.map(lambda p: (p[0].strip(), p[1].strip(), p[2].strip()))
 +
schemaString = "IN OUT INCOME"
 +
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
 +
schema = StructType(fields)
 +
schema2011 = sqlContext.createDataFrame(jeju2011, schema)
 +
 +
schema2011.registerTempTable('jeju2011')
 +
schema2011.show()
 +
 +
=== pyspark Sample 7 ===
 +
 +
from pyspark.sql.types import *
 +
 +
#--- 데이터 로드
 +
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv')
 +
parts = lines.map(lambda l: l.split(','))
 +
jeju2010 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip()))
 +
schema2010 = sqlContext.createDataFrame(jeju2010)
 +
 +
schema2010.registerTempTable('jeju2010')
 +
schema2010.show()
 +
 +
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv')
 +
parts = lines.map(lambda l: l.split(','))
 +
jeju2011 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip()))
 +
schema2011 = sqlContext.createDataFrame(jeju2011)
 +
 +
schema2011.registerTempTable('jeju2011')
 +
schema2011.show()
 +
 +
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2012.csv')
 +
parts = lines.map(lambda l: l.split(','))
 +
jeju2012 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip()))
 +
schema2012 = sqlContext.createDataFrame(jeju2012)
 +
 +
schema2012.registerTempTable('jeju2012')
 +
schema2012.show()
 +
 
 +
#--- 데이터 수정
 +
jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
 +
jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'")
 +
jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'")
 +
 +
tmp = jeju2010.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1]))
 +
jeju2010 = sqlContext.createDataFrame(tmp)
 +
tmp = jeju2011.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1]))
 +
jeju2011 = sqlContext.createDataFrame(tmp)
 +
tmp = jeju2012.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1]))
 +
jeju2012 = sqlContext.createDataFrame(tmp)
 +
 +
jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012)
 +
tmp = jeju.map(lambda p: Row(YEAR=int(p[3]), IN=p[0], OUT=p[1], INCOME=p[2]))
 +
jeju2= sqlContext.createDataFrame(tmp)
 +
 +
#--- 기술통계
 +
jeju2.describe().show()
 +
jeju2.groupBy('YEAR').avg().show()
 +
jeju2.select('IN', 'INCOME', 'YEAR').groupBy('YEAR').mean().show()
 +
jeju2.select('IN', 'OUT', 'YEAR').groupBy('YEAR').mean().show()
 +
 +
#--- 상관계수
 +
jeju2.corr('IN', 'OUT')
 +
 +
#--- 회귀분석
 +
import numpy as np
 +
import numpy.linalg as lin
 +
 +
X = np.array(jeju2.select('IN', 'OUT').collect())
 +
Y = np.array(jeju2.select('INCOME').collect())
 +
 +
Beta0 = np.dot(lin.inv(np.dot(X.T, X), np.dot(X.T, Y))
 +
 +
X1 = np.hstack([np.array([np.ones(36)]).T, X])
 +
Beta1 = np.dot(lin.inv(np.dot(X1.T, X1)), np.dot(X1.T, Y))
 +
 +
#--- 결정계수
 +
R0 = np.sum((np.dot(X, Beta0) - np.mean(Y))**2) / np.sum((Y - np.mean(Y)))**2)
 +
R1 = np.sum((np.dot(X1, Beta1) - np.mean(Y))**2 / npsum((Y - np.mean(Y)))**2)
 +
 +
adR0 = 1 - (1 - R0) * (36 - 1) / (36 - 2 - 1)
 +
adR1 = 1 - (1 - R1) * (36 - 1) / (36 - 3 - 1.)
 +
 +
#--- 시각화
 +
import matplotlib.pylab as plt
 +
plt.scatter(X[:, 0], Y)
 +
plt.show()
 +
plt.scatter(X[:, 1], Y)
 +
plt.show()
 +
 +
Beta_IN = np.dot(Lininv(np.dot(X1[:, [0, 1]].T, X1[:, [0, 1]])), np.dot(X1[:, [0, 1]].T, Y))
 +
y = Beta_IN[0] + Beta_IN[1] * X1[:, 1]
 +
 +
plt.scatter(X1[:, 1], Y)
 +
plt.plot(X1[:, 1], Y)
 +
plt.show()
 +
 +
plt.scatter(X1[:, 1], Y, label = 'Row Data')
 +
plt.plot(X1[:, 1], y, label = 'Fitted', color = 'red')
 +
plt.legend(loc = 'upper left')
 +
plt.show()
 +
 +
from mpl_toolkits.mplot3d import Axes3D
 +
from matplotlib import cm
 +
 +
fig = plt.figure()
 +
ax = Axes3D(fig)
 +
y = Beta1[0] + Beta1[1] * X[:, 0] + Beta1[2] * X[:, 1]
 +
XX1, XX2 = np.meshgrid(X[:, 0], X[:, 1])
 +
YY = Beta1[0] + Beta[1] * XX1 + Beata1[2] * XX2
 +
ax.plot(X[:, 0], X[:, 1], y, linestyle = 'none', marker = 'o', markerfacecolor = 'blue')
 +
ax.plot_surface(XX1, XX2, YY, rstride = 1, cstride = 1, cmap = 'hot')
  
 
== 참고 문헌 ==
 
== 참고 문헌 ==
  
 
*[[Esper|Esper]]
 
*[[Esper|Esper]]
 +
*[https://wikidocs.net/book/1686 Spark SQL]
 
*[http://www.hanb.co.kr/network/view.html?bi_id=1840 http://www.hanb.co.kr/network/view.html?bi_id=1840]
 
*[http://www.hanb.co.kr/network/view.html?bi_id=1840 http://www.hanb.co.kr/network/view.html?bi_id=1840]
 
*[http://ampcamp.berkeley.edu/big-data-mini-course/ http://ampcamp.berkeley.edu/big-data-mini-course/]
 
*[http://ampcamp.berkeley.edu/big-data-mini-course/ http://ampcamp.berkeley.edu/big-data-mini-course/]

2018년 4월 26일 (목) 20:17 기준 최신판

인-메모리 기반의 클러스터 컴퓨팅 프레임워크인 Spark를 정리 합니다.

  • API : Java, Scala, Python, R

Spark 개요

Apach Spark UC 버클리 대학의 AMPLab에서 내놓은 대용량 분산 처리 및 분석용 오픈소스이다. 2014년 2월부터 아파치 재단의 톱 프로젝트가 되었다.

  • 대화형 질의 분석기(Shark), 대용량 그래프 처리 및 분석기(Bagel), 실시간 분석기(Spark Streaming) 등을 함께 제공

Spark 구성

Scala 설치

cd  /install
wget  https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.tgz

cd  /appl
tar  -xvzf  /install/scala-2.12.3.tgz
mv  scala-2.12.3.tgz  scala
# export PATH=${PATH}:/appl/scala/bin

Spark 설치

Spark 설치

cd  /install
wget  http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz

cd  /appl
tar  -xvzf  /install/spark-2.3.0-bin-hadoop2.7.tgz
mv  spark-2.3.0-bin-hadoop2.7  spark

cd  /appl/spark
cd  conf
cp  spark-env.sh.template      spark-env.sh
cp  log4j.properties.template  log4j.properties

vi  spark-env.sh

vi  log4j.properties
    log4j.rootCategory=WARN, console

# cd /appl/spark
# sbin/start-master.sh
# sbin/start-slave.sh  spark://localhost:7077
# bin/pyspark  -master  spark://localhost:7077

cd  /appl/spark
sbin/start-all.sh
bin/pyspark
# bin/spark-shell

폴더 구성

  • R/
  • bin/
  • conf/
  • data/
  • examples/
  • jars/
  • kubernetes/
  • licenses/
  • python/
  • sbin/
  • yarn/

K-ICT 교육

Spark 설치

cd  ~
mkdir  install

cd  ~/install
wget  http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz

cd  ~  
tar  -xvzf  /install/spark-2.3.0-bin-hadoop2.7.tgz
mv  spark-2.3.0-bin-hadoop2.7  spark

cd  ~/spark
cd  conf
cp  spark-env.sh.template      spark-env.sh
cp  log4j.properties.template  log4j.properties
vi  spark-env.sh
    export LANG=ko_KR.UTF-8

    export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64
    export PATH=$PATH:$JAVA_HOME

    export HADOOP_INSTALL=/usr/local/hadoop
    export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
    export HADOOP_COMMON_HOME=$HADOOP_INSTALL
    export HADOOP_HDFS_HOME=$HADOOP_INSTALL
    export YARN_HOME=$HADOOP_INSTALL
    export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
    export PATH=$PATH:$HADOOP_INSTALL/sbin
    export PATH=$PATH:$HADOOP_INSTALL/bin

    export SPARK_DIST_CLASSPATH=$(hadoop classpath)

vi  log4j.properties
    log4j.rootCategory=WARN, console
	
cd ~/spark
sbin/start-all.sh
bin/pyspark

DataFrame으로 로드

RDD (Resilient Distributed Dataset)

JSON 파일 로드

df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")

Text 파일 로드

from pyspark.sql import Row

lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") 
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
df = sqlContext.createDataFrame(people)

Text 파일 로드 with Schema 지정

from pyspark.sql.types import *
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")

parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

schemaPeople = sqlContext.createDataFrame(people, schema)

Parquet 데이터 로드

df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")

DataFrame을 저장

df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet")
# df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet")

df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")

DataFrame을 TempTable로 지정

df.registerTempTable("people")
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()

pyspark Sample 1

help(sqlContext)
    q                              

df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
df.show()
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()
quit()

pyspark Sample 2

from pyspark.sql import Row

lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") 
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = sqlContext.createDataFrame(people)

schemaPeople.registerTempTable("people")
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()

# teenNames = teenagers.map(lambda p: "Name: " + p.name)
# for teenName in teenNames.collect():
#     print(teenName)
quit()

pyspark Sample 3

from pyspark.sql.types import *

lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
schemaPeople = sqlContext.createDataFrame(people, schema)

schemaPeople.registerTempTable("people")
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()

# teenNames = teenagers.map(lambda p: "Name: " + p.name)
# for teenName in teenNames.collect():
#    print(teenName)
quit()

pyspark Sample 4

#--- 데이터 로드/저장
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
#--- 저장시 /home/eduuser/namesAndFavColors.parquet/ 폴더가 생성 됩니다.
df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet")
df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet")

df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")

DataSet 다운로드

cd  ~
cd  nia_kbig
[eduuser@localhost nia_kbig]$ ./datasetDownload.sh
다운로드받을 데이터셋 코드를 입력하세요.
    8h7k4
    ka988
    z24nt

cd  view/basic
ls  -alF
    -rwxr-xr-x. 1 eduuser eduuser  216 2014-12-14 08:46 01.move_data_file.sh*
    -rwxr-xr-x. 1 eduuser eduuser  283 2014-12-04 22:22 01.move_data_file.sh~*
    -rwxr-xr-x. 1 eduuser eduuser  270 2014-12-12 16:20 jeju_2010.csv*
    -rwxr-xr-x. 1 eduuser eduuser  273 2014-12-12 16:16 jeju_2011.csv*
    -rwxr-xr-x. 1 eduuser eduuser  278 2014-12-12 16:16 jeju_2012.csv*
    -rwxr-xr-x. 1 eduuser eduuser 1476 2015-01-11 22:59 view_basic_analysis.r*

# hdfs dfs -mkdir /user
# hdfs dfs -mkdir /user/eduuser/
# hdfs dfs -put jeju* /user/eduuser/

pyspark Sample 5

from pyspark.sql import Row

lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv')
parts = lines.map(lambda l: l.split(','))
jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2]))
schema2010 = sqlContext.createDataFrame(jeju2010)
 
schema2010.registerTempTable("jeju2010")
schema2010.show()

sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'").show()
jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
jenu2010.show()

pyspark Sample 6

from pyspark.sql.types import *

lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv')
parts = lines.map(lambda l: l.split(','))
jeju2011 = parts.map(lambda p: (p[0].strip(), p[1].strip(), p[2].strip()))
schemaString = "IN OUT INCOME"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
schema2011 = sqlContext.createDataFrame(jeju2011, schema)

schema2011.registerTempTable('jeju2011')
schema2011.show()

pyspark Sample 7

from pyspark.sql.types import *

#--- 데이터 로드
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv')
parts = lines.map(lambda l: l.split(','))
jeju2010 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip()))
schema2010 = sqlContext.createDataFrame(jeju2010)

schema2010.registerTempTable('jeju2010')
schema2010.show()

lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv')
parts = lines.map(lambda l: l.split(','))
jeju2011 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip()))
schema2011 = sqlContext.createDataFrame(jeju2011)

schema2011.registerTempTable('jeju2011')
schema2011.show()

lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2012.csv')
parts = lines.map(lambda l: l.split(','))
jeju2012 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip()))
schema2012 = sqlContext.createDataFrame(jeju2012)

schema2012.registerTempTable('jeju2012')
schema2012.show()
 
#--- 데이터 수정
jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'")
jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'")

tmp = jeju2010.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1]))
jeju2010 = sqlContext.createDataFrame(tmp)
tmp = jeju2011.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1]))
jeju2011 = sqlContext.createDataFrame(tmp)
tmp = jeju2012.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1]))
jeju2012 = sqlContext.createDataFrame(tmp)

jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012)
tmp = jeju.map(lambda p: Row(YEAR=int(p[3]), IN=p[0], OUT=p[1], INCOME=p[2]))
jeju2= sqlContext.createDataFrame(tmp)

#--- 기술통계
jeju2.describe().show()
jeju2.groupBy('YEAR').avg().show()
jeju2.select('IN', 'INCOME', 'YEAR').groupBy('YEAR').mean().show()
jeju2.select('IN', 'OUT', 'YEAR').groupBy('YEAR').mean().show()

#--- 상관계수
jeju2.corr('IN', 'OUT')

#--- 회귀분석
import numpy as np
import numpy.linalg as lin

X = np.array(jeju2.select('IN', 'OUT').collect()) 
Y = np.array(jeju2.select('INCOME').collect())

Beta0 = np.dot(lin.inv(np.dot(X.T, X), np.dot(X.T, Y))

X1 = np.hstack([np.array([np.ones(36)]).T, X])
Beta1 = np.dot(lin.inv(np.dot(X1.T, X1)), np.dot(X1.T, Y))

#--- 결정계수
R0 = np.sum((np.dot(X, Beta0) - np.mean(Y))**2) / np.sum((Y - np.mean(Y)))**2)
R1 = np.sum((np.dot(X1, Beta1) - np.mean(Y))**2 / npsum((Y - np.mean(Y)))**2)

adR0 = 1 - (1 - R0) * (36 - 1) / (36 - 2 - 1)
adR1 = 1 - (1 - R1) * (36 - 1) / (36 - 3 - 1.)

#--- 시각화
import matplotlib.pylab as plt
plt.scatter(X[:, 0], Y)
plt.show()
plt.scatter(X[:, 1], Y)
plt.show()

Beta_IN = np.dot(Lininv(np.dot(X1[:, [0, 1]].T, X1[:, [0, 1]])), np.dot(X1[:, [0, 1]].T, Y))
y = Beta_IN[0] + Beta_IN[1] * X1[:, 1]

plt.scatter(X1[:, 1], Y)
plt.plot(X1[:, 1], Y) 
plt.show()

plt.scatter(X1[:, 1], Y, label = 'Row Data')
plt.plot(X1[:, 1], y, label = 'Fitted', color = 'red')
plt.legend(loc = 'upper left')
plt.show()

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm

fig = plt.figure()
ax = Axes3D(fig)
y = Beta1[0] + Beta1[1] * X[:, 0] + Beta1[2] * X[:, 1]
XX1, XX2 = np.meshgrid(X[:, 0], X[:, 1])
YY = Beta1[0] + Beta[1] * XX1 + Beata1[2] * XX2
ax.plot(X[:, 0], X[:, 1], y, linestyle = 'none', marker = 'o', markerfacecolor = 'blue')
ax.plot_surface(XX1, XX2, YY, rstride = 1, cstride = 1, cmap = 'hot')

참고 문헌