"Spark"의 두 판 사이의 차이
둘러보기로 가기
검색하러 가기
잔글 (→pyspark Sample 6) |
잔글 (→pyspark Sample 7) |
||
(같은 사용자의 중간 판 29개는 보이지 않습니다) | |||
14번째 줄: | 14번째 줄: | ||
== Spark 구성 == | == Spark 구성 == | ||
+ | === Scala 설치 === | ||
+ | cd /install | ||
+ | wget https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.tgz | ||
+ | |||
+ | cd /appl | ||
+ | tar -xvzf /install/scala-2.12.3.tgz | ||
+ | mv scala-2.12.3.tgz scala | ||
+ | # export PATH=${PATH}:/appl/scala/bin | ||
+ | |||
=== Spark 설치 === | === Spark 설치 === | ||
Spark 설치 | Spark 설치 | ||
− | |||
− | |||
− | |||
− | |||
cd /install | cd /install | ||
wget http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz | wget http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz | ||
32번째 줄: | 37번째 줄: | ||
cp spark-env.sh.template spark-env.sh | cp spark-env.sh.template spark-env.sh | ||
cp log4j.properties.template log4j.properties | cp log4j.properties.template log4j.properties | ||
+ | |||
vi spark-env.sh | vi spark-env.sh | ||
+ | |||
vi log4j.properties | vi log4j.properties | ||
log4j.rootCategory=WARN, console | log4j.rootCategory=WARN, console | ||
40번째 줄: | 47번째 줄: | ||
# sbin/start-slave.sh spark://localhost:7077 | # sbin/start-slave.sh spark://localhost:7077 | ||
# bin/pyspark -master spark://localhost:7077 | # bin/pyspark -master spark://localhost:7077 | ||
+ | |||
+ | cd /appl/spark | ||
+ | sbin/start-all.sh | ||
+ | bin/pyspark | ||
+ | # bin/spark-shell | ||
− | + | * Pyspark : http://localhost:4040/ | |
− | + | * Spark : http://localhost:8080/ | |
− | |||
− | |||
− | |||
− | |||
− | |||
=== 폴더 구성 === | === 폴더 구성 === | ||
108번째 줄: | 115번째 줄: | ||
* Pyspark : http://localhost:4040/ | * Pyspark : http://localhost:4040/ | ||
* Spark : http://localhost:8080/ | * Spark : http://localhost:8080/ | ||
+ | |||
+ | === DataFrame으로 로드 === | ||
+ | |||
+ | RDD (Resilient Distributed Dataset) | ||
+ | |||
+ | JSON 파일 로드 | ||
+ | df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json") | ||
+ | df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json") | ||
+ | |||
+ | Text 파일 로드 | ||
+ | from pyspark.sql import Row | ||
+ | |||
+ | lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") | ||
+ | parts = lines.map(lambda l: l.split(",")) | ||
+ | people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) | ||
+ | df = sqlContext.createDataFrame(people) | ||
+ | |||
+ | Text 파일 로드 with Schema 지정 | ||
+ | from pyspark.sql.types import * | ||
+ | lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") | ||
+ | |||
+ | parts = lines.map(lambda l: l.split(",")) | ||
+ | people = parts.map(lambda p: (p[0], p[1].strip())) | ||
+ | |||
+ | schemaString = "name age" | ||
+ | fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | ||
+ | schema = StructType(fields) | ||
+ | |||
+ | schemaPeople = sqlContext.createDataFrame(people, schema) | ||
+ | |||
+ | Parquet 데이터 로드 | ||
+ | df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet") | ||
+ | |||
+ | === DataFrame을 저장 === | ||
+ | df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet") | ||
+ | # df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet") | ||
+ | |||
+ | df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet") | ||
+ | |||
+ | === DataFrame을 TempTable로 지정 === | ||
+ | df.registerTempTable("people") | ||
+ | teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") | ||
+ | teenagers.show() | ||
=== pyspark Sample 1 === | === pyspark Sample 1 === | ||
help(sqlContext) | help(sqlContext) | ||
− | q | + | q |
df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json") | df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json") | ||
126번째 줄: | 176번째 줄: | ||
from pyspark.sql import Row | from pyspark.sql import Row | ||
+ | |||
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") | lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") | ||
− | |||
parts = lines.map(lambda l: l.split(",")) | parts = lines.map(lambda l: l.split(",")) | ||
people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) | people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) | ||
144번째 줄: | 194번째 줄: | ||
from pyspark.sql.types import * | from pyspark.sql.types import * | ||
+ | |||
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") | lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") | ||
− | |||
parts = lines.map(lambda l: l.split(",")) | parts = lines.map(lambda l: l.split(",")) | ||
people = parts.map(lambda p: (p[0], p[1].strip())) | people = parts.map(lambda p: (p[0], p[1].strip())) | ||
152번째 줄: | 202번째 줄: | ||
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | ||
schema = StructType(fields) | schema = StructType(fields) | ||
− | |||
schemaPeople = sqlContext.createDataFrame(people, schema) | schemaPeople = sqlContext.createDataFrame(people, schema) | ||
165번째 줄: | 214번째 줄: | ||
=== pyspark Sample 4 === | === pyspark Sample 4 === | ||
+ | |||
+ | #--- 데이터 로드/저장 | ||
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet") | df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet") | ||
− | df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") | + | #--- 저장시 /home/eduuser/namesAndFavColors.parquet/ 폴더가 생성 됩니다. |
+ | df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet") | ||
+ | df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet") | ||
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json") | df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json") | ||
− | df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") | + | df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet") |
=== DataSet 다운로드 === | === DataSet 다운로드 === | ||
190번째 줄: | 243번째 줄: | ||
-rwxr-xr-x. 1 eduuser eduuser 1476 2015-01-11 22:59 view_basic_analysis.r* | -rwxr-xr-x. 1 eduuser eduuser 1476 2015-01-11 22:59 view_basic_analysis.r* | ||
− | hdfs dfs -mkdir /user | + | # hdfs dfs -mkdir /user |
− | hdfs dfs -mkdir /user/eduuser/ | + | # hdfs dfs -mkdir /user/eduuser/ |
− | hdfs dfs -put jeju* /user/eduuser/ | + | # hdfs dfs -put jeju* /user/eduuser/ |
=== pyspark Sample 5 === | === pyspark Sample 5 === | ||
from pyspark.sql import Row | from pyspark.sql import Row | ||
+ | |||
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv') | lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv') | ||
parts = lines.map(lambda l: l.split(',')) | parts = lines.map(lambda l: l.split(',')) | ||
− | |||
jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2])) | jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2])) | ||
schema2010 = sqlContext.createDataFrame(jeju2010) | schema2010 = sqlContext.createDataFrame(jeju2010) | ||
+ | |||
schema2010.registerTempTable("jeju2010") | schema2010.registerTempTable("jeju2010") | ||
schema2010.show() | schema2010.show() | ||
211번째 줄: | 265번째 줄: | ||
from pyspark.sql.types import * | from pyspark.sql.types import * | ||
+ | |||
lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv') | lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv') | ||
parts = lines.map(lambda l: l.split(',')) | parts = lines.map(lambda l: l.split(',')) | ||
216번째 줄: | 271번째 줄: | ||
schemaString = "IN OUT INCOME" | schemaString = "IN OUT INCOME" | ||
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] | ||
+ | schema = StructType(fields) | ||
schema2011 = sqlContext.createDataFrame(jeju2011, schema) | schema2011 = sqlContext.createDataFrame(jeju2011, schema) | ||
+ | |||
schema2011.registerTempTable('jeju2011') | schema2011.registerTempTable('jeju2011') | ||
schema2011.show() | schema2011.show() | ||
+ | |||
+ | === pyspark Sample 7 === | ||
+ | |||
+ | from pyspark.sql.types import * | ||
+ | |||
+ | #--- 데이터 로드 | ||
+ | lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv') | ||
+ | parts = lines.map(lambda l: l.split(',')) | ||
+ | jeju2010 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip())) | ||
+ | schema2010 = sqlContext.createDataFrame(jeju2010) | ||
+ | |||
+ | schema2010.registerTempTable('jeju2010') | ||
+ | schema2010.show() | ||
+ | |||
+ | lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv') | ||
+ | parts = lines.map(lambda l: l.split(',')) | ||
+ | jeju2011 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip())) | ||
+ | schema2011 = sqlContext.createDataFrame(jeju2011) | ||
+ | |||
+ | schema2011.registerTempTable('jeju2011') | ||
+ | schema2011.show() | ||
+ | |||
+ | lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2012.csv') | ||
+ | parts = lines.map(lambda l: l.split(',')) | ||
+ | jeju2012 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip())) | ||
+ | schema2012 = sqlContext.createDataFrame(jeju2012) | ||
+ | |||
+ | schema2012.registerTempTable('jeju2012') | ||
+ | schema2012.show() | ||
+ | |||
+ | #--- 데이터 수정 | ||
+ | jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'") | ||
+ | jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'") | ||
+ | jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'") | ||
+ | |||
+ | tmp = jeju2010.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1])) | ||
+ | jeju2010 = sqlContext.createDataFrame(tmp) | ||
+ | tmp = jeju2011.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1])) | ||
+ | jeju2011 = sqlContext.createDataFrame(tmp) | ||
+ | tmp = jeju2012.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1])) | ||
+ | jeju2012 = sqlContext.createDataFrame(tmp) | ||
+ | |||
+ | jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012) | ||
+ | tmp = jeju.map(lambda p: Row(YEAR=int(p[3]), IN=p[0], OUT=p[1], INCOME=p[2])) | ||
+ | jeju2= sqlContext.createDataFrame(tmp) | ||
+ | |||
+ | #--- 기술통계 | ||
+ | jeju2.describe().show() | ||
+ | jeju2.groupBy('YEAR').avg().show() | ||
+ | jeju2.select('IN', 'INCOME', 'YEAR').groupBy('YEAR').mean().show() | ||
+ | jeju2.select('IN', 'OUT', 'YEAR').groupBy('YEAR').mean().show() | ||
+ | |||
+ | #--- 상관계수 | ||
+ | jeju2.corr('IN', 'OUT') | ||
+ | |||
+ | #--- 회귀분석 | ||
+ | import numpy as np | ||
+ | import numpy.linalg as lin | ||
+ | |||
+ | X = np.array(jeju2.select('IN', 'OUT').collect()) | ||
+ | Y = np.array(jeju2.select('INCOME').collect()) | ||
+ | |||
+ | Beta0 = np.dot(lin.inv(np.dot(X.T, X), np.dot(X.T, Y)) | ||
+ | |||
+ | X1 = np.hstack([np.array([np.ones(36)]).T, X]) | ||
+ | Beta1 = np.dot(lin.inv(np.dot(X1.T, X1)), np.dot(X1.T, Y)) | ||
+ | |||
+ | #--- 결정계수 | ||
+ | R0 = np.sum((np.dot(X, Beta0) - np.mean(Y))**2) / np.sum((Y - np.mean(Y)))**2) | ||
+ | R1 = np.sum((np.dot(X1, Beta1) - np.mean(Y))**2 / npsum((Y - np.mean(Y)))**2) | ||
+ | |||
+ | adR0 = 1 - (1 - R0) * (36 - 1) / (36 - 2 - 1) | ||
+ | adR1 = 1 - (1 - R1) * (36 - 1) / (36 - 3 - 1.) | ||
+ | |||
+ | #--- 시각화 | ||
+ | import matplotlib.pylab as plt | ||
+ | plt.scatter(X[:, 0], Y) | ||
+ | plt.show() | ||
+ | plt.scatter(X[:, 1], Y) | ||
+ | plt.show() | ||
+ | |||
+ | Beta_IN = np.dot(Lininv(np.dot(X1[:, [0, 1]].T, X1[:, [0, 1]])), np.dot(X1[:, [0, 1]].T, Y)) | ||
+ | y = Beta_IN[0] + Beta_IN[1] * X1[:, 1] | ||
+ | |||
+ | plt.scatter(X1[:, 1], Y) | ||
+ | plt.plot(X1[:, 1], Y) | ||
+ | plt.show() | ||
+ | |||
+ | plt.scatter(X1[:, 1], Y, label = 'Row Data') | ||
+ | plt.plot(X1[:, 1], y, label = 'Fitted', color = 'red') | ||
+ | plt.legend(loc = 'upper left') | ||
+ | plt.show() | ||
+ | |||
+ | from mpl_toolkits.mplot3d import Axes3D | ||
+ | from matplotlib import cm | ||
+ | |||
+ | fig = plt.figure() | ||
+ | ax = Axes3D(fig) | ||
+ | y = Beta1[0] + Beta1[1] * X[:, 0] + Beta1[2] * X[:, 1] | ||
+ | XX1, XX2 = np.meshgrid(X[:, 0], X[:, 1]) | ||
+ | YY = Beta1[0] + Beta[1] * XX1 + Beata1[2] * XX2 | ||
+ | ax.plot(X[:, 0], X[:, 1], y, linestyle = 'none', marker = 'o', markerfacecolor = 'blue') | ||
+ | ax.plot_surface(XX1, XX2, YY, rstride = 1, cstride = 1, cmap = 'hot') | ||
== 참고 문헌 == | == 참고 문헌 == | ||
*[[Esper|Esper]] | *[[Esper|Esper]] | ||
+ | *[https://wikidocs.net/book/1686 Spark SQL] | ||
*[http://www.hanb.co.kr/network/view.html?bi_id=1840 http://www.hanb.co.kr/network/view.html?bi_id=1840] | *[http://www.hanb.co.kr/network/view.html?bi_id=1840 http://www.hanb.co.kr/network/view.html?bi_id=1840] | ||
*[http://ampcamp.berkeley.edu/big-data-mini-course/ http://ampcamp.berkeley.edu/big-data-mini-course/] | *[http://ampcamp.berkeley.edu/big-data-mini-course/ http://ampcamp.berkeley.edu/big-data-mini-course/] |
2018년 4월 26일 (목) 20:17 기준 최신판
인-메모리 기반의 클러스터 컴퓨팅 프레임워크인 Spark를 정리 합니다.
- 홈페이지 : http://spark-project.org/, https://spark.apache.org/
- 다운로드 :
- 라이선스 :
- 플랫폼 : Scala
- API : Java, Scala, Python, R
Spark 개요
Apach Spark UC 버클리 대학의 AMPLab에서 내놓은 대용량 분산 처리 및 분석용 오픈소스이다. 2014년 2월부터 아파치 재단의 톱 프로젝트가 되었다.
- 대화형 질의 분석기(Shark), 대용량 그래프 처리 및 분석기(Bagel), 실시간 분석기(Spark Streaming) 등을 함께 제공
Spark 구성
Scala 설치
cd /install wget https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.tgz cd /appl tar -xvzf /install/scala-2.12.3.tgz mv scala-2.12.3.tgz scala # export PATH=${PATH}:/appl/scala/bin
Spark 설치
Spark 설치
cd /install wget http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz cd /appl tar -xvzf /install/spark-2.3.0-bin-hadoop2.7.tgz mv spark-2.3.0-bin-hadoop2.7 spark cd /appl/spark cd conf cp spark-env.sh.template spark-env.sh cp log4j.properties.template log4j.properties vi spark-env.sh vi log4j.properties log4j.rootCategory=WARN, console # cd /appl/spark # sbin/start-master.sh # sbin/start-slave.sh spark://localhost:7077 # bin/pyspark -master spark://localhost:7077 cd /appl/spark sbin/start-all.sh bin/pyspark # bin/spark-shell
- Pyspark : http://localhost:4040/
- Spark : http://localhost:8080/
폴더 구성
- R/
- bin/
- conf/
- data/
- examples/
- jars/
- kubernetes/
- licenses/
- python/
- sbin/
- yarn/
K-ICT 교육
Spark 설치
cd ~ mkdir install cd ~/install wget http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz cd ~ tar -xvzf /install/spark-2.3.0-bin-hadoop2.7.tgz mv spark-2.3.0-bin-hadoop2.7 spark cd ~/spark cd conf cp spark-env.sh.template spark-env.sh cp log4j.properties.template log4j.properties vi spark-env.sh export LANG=ko_KR.UTF-8 export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64 export PATH=$PATH:$JAVA_HOME export HADOOP_INSTALL=/usr/local/hadoop export HADOOP_MAPRED_HOME=$HADOOP_INSTALL export HADOOP_COMMON_HOME=$HADOOP_INSTALL export HADOOP_HDFS_HOME=$HADOOP_INSTALL export YARN_HOME=$HADOOP_INSTALL export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native export PATH=$PATH:$HADOOP_INSTALL/sbin export PATH=$PATH:$HADOOP_INSTALL/bin export SPARK_DIST_CLASSPATH=$(hadoop classpath) vi log4j.properties log4j.rootCategory=WARN, console cd ~/spark sbin/start-all.sh bin/pyspark
- Hadoop Resource Manager : http://localhost:8088/
- Hadoop Node Manager : http://localhost:8042/
- Pyspark : http://localhost:4040/
- Spark : http://localhost:8080/
DataFrame으로 로드
RDD (Resilient Distributed Dataset)
JSON 파일 로드
df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json") df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
Text 파일 로드
from pyspark.sql import Row lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) df = sqlContext.createDataFrame(people)
Text 파일 로드 with Schema 지정
from pyspark.sql.types import * lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) schemaPeople = sqlContext.createDataFrame(people, schema)
Parquet 데이터 로드
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
DataFrame을 저장
df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet") # df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet") df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")
DataFrame을 TempTable로 지정
df.registerTempTable("people") teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") teenagers.show()
pyspark Sample 1
help(sqlContext) q df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json") df.show() df.printSchema() df.select("name").show() df.select(df['name'], df['age'] + 1).show() df.filter(df['age'] > 21).show() df.groupBy("age").count().show() quit()
pyspark Sample 2
from pyspark.sql import Row lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) schemaPeople = sqlContext.createDataFrame(people) schemaPeople.registerTempTable("people") teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") teenagers.show() # teenNames = teenagers.map(lambda p: "Name: " + p.name) # for teenName in teenNames.collect(): # print(teenName) quit()
pyspark Sample 3
from pyspark.sql.types import * lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) schemaPeople = sqlContext.createDataFrame(people, schema) schemaPeople.registerTempTable("people") teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") teenagers.show() # teenNames = teenagers.map(lambda p: "Name: " + p.name) # for teenName in teenNames.collect(): # print(teenName) quit()
pyspark Sample 4
#--- 데이터 로드/저장 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet") #--- 저장시 /home/eduuser/namesAndFavColors.parquet/ 폴더가 생성 됩니다. df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet") df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet") df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")
DataSet 다운로드
cd ~ cd nia_kbig [eduuser@localhost nia_kbig]$ ./datasetDownload.sh 다운로드받을 데이터셋 코드를 입력하세요. 8h7k4 ka988 z24nt cd view/basic ls -alF -rwxr-xr-x. 1 eduuser eduuser 216 2014-12-14 08:46 01.move_data_file.sh* -rwxr-xr-x. 1 eduuser eduuser 283 2014-12-04 22:22 01.move_data_file.sh~* -rwxr-xr-x. 1 eduuser eduuser 270 2014-12-12 16:20 jeju_2010.csv* -rwxr-xr-x. 1 eduuser eduuser 273 2014-12-12 16:16 jeju_2011.csv* -rwxr-xr-x. 1 eduuser eduuser 278 2014-12-12 16:16 jeju_2012.csv* -rwxr-xr-x. 1 eduuser eduuser 1476 2015-01-11 22:59 view_basic_analysis.r* # hdfs dfs -mkdir /user # hdfs dfs -mkdir /user/eduuser/ # hdfs dfs -put jeju* /user/eduuser/
pyspark Sample 5
from pyspark.sql import Row lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv') parts = lines.map(lambda l: l.split(',')) jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2])) schema2010 = sqlContext.createDataFrame(jeju2010) schema2010.registerTempTable("jeju2010") schema2010.show() sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'").show() jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'") jenu2010.show()
pyspark Sample 6
from pyspark.sql.types import * lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv') parts = lines.map(lambda l: l.split(',')) jeju2011 = parts.map(lambda p: (p[0].strip(), p[1].strip(), p[2].strip())) schemaString = "IN OUT INCOME" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) schema2011 = sqlContext.createDataFrame(jeju2011, schema) schema2011.registerTempTable('jeju2011') schema2011.show()
pyspark Sample 7
from pyspark.sql.types import * #--- 데이터 로드 lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv') parts = lines.map(lambda l: l.split(',')) jeju2010 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip())) schema2010 = sqlContext.createDataFrame(jeju2010) schema2010.registerTempTable('jeju2010') schema2010.show() lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv') parts = lines.map(lambda l: l.split(',')) jeju2011 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip())) schema2011 = sqlContext.createDataFrame(jeju2011) schema2011.registerTempTable('jeju2011') schema2011.show() lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2012.csv') parts = lines.map(lambda l: l.split(',')) jeju2012 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2].strip())) schema2012 = sqlContext.createDataFrame(jeju2012) schema2012.registerTempTable('jeju2012') schema2012.show() #--- 데이터 수정 jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'") jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'") jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'") tmp = jeju2010.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1])) jeju2010 = sqlContext.createDataFrame(tmp) tmp = jeju2011.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1])) jeju2011 = sqlContext.createDataFrame(tmp) tmp = jeju2012.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1])) jeju2012 = sqlContext.createDataFrame(tmp) jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012) tmp = jeju.map(lambda p: Row(YEAR=int(p[3]), IN=p[0], OUT=p[1], INCOME=p[2])) jeju2= sqlContext.createDataFrame(tmp) #--- 기술통계 jeju2.describe().show() jeju2.groupBy('YEAR').avg().show() jeju2.select('IN', 'INCOME', 'YEAR').groupBy('YEAR').mean().show() jeju2.select('IN', 'OUT', 'YEAR').groupBy('YEAR').mean().show() #--- 상관계수 jeju2.corr('IN', 'OUT') #--- 회귀분석 import numpy as np import numpy.linalg as lin X = np.array(jeju2.select('IN', 'OUT').collect()) Y = np.array(jeju2.select('INCOME').collect()) Beta0 = np.dot(lin.inv(np.dot(X.T, X), np.dot(X.T, Y)) X1 = np.hstack([np.array([np.ones(36)]).T, X]) Beta1 = np.dot(lin.inv(np.dot(X1.T, X1)), np.dot(X1.T, Y)) #--- 결정계수 R0 = np.sum((np.dot(X, Beta0) - np.mean(Y))**2) / np.sum((Y - np.mean(Y)))**2) R1 = np.sum((np.dot(X1, Beta1) - np.mean(Y))**2 / npsum((Y - np.mean(Y)))**2) adR0 = 1 - (1 - R0) * (36 - 1) / (36 - 2 - 1) adR1 = 1 - (1 - R1) * (36 - 1) / (36 - 3 - 1.) #--- 시각화 import matplotlib.pylab as plt plt.scatter(X[:, 0], Y) plt.show() plt.scatter(X[:, 1], Y) plt.show() Beta_IN = np.dot(Lininv(np.dot(X1[:, [0, 1]].T, X1[:, [0, 1]])), np.dot(X1[:, [0, 1]].T, Y)) y = Beta_IN[0] + Beta_IN[1] * X1[:, 1] plt.scatter(X1[:, 1], Y) plt.plot(X1[:, 1], Y) plt.show() plt.scatter(X1[:, 1], Y, label = 'Row Data') plt.plot(X1[:, 1], y, label = 'Fitted', color = 'red') plt.legend(loc = 'upper left') plt.show() from mpl_toolkits.mplot3d import Axes3D from matplotlib import cm fig = plt.figure() ax = Axes3D(fig) y = Beta1[0] + Beta1[1] * X[:, 0] + Beta1[2] * X[:, 1] XX1, XX2 = np.meshgrid(X[:, 0], X[:, 1]) YY = Beta1[0] + Beta[1] * XX1 + Beata1[2] * XX2 ax.plot(X[:, 0], X[:, 1], y, linestyle = 'none', marker = 'o', markerfacecolor = 'blue') ax.plot_surface(XX1, XX2, YY, rstride = 1, cstride = 1, cmap = 'hot')