스파크 개념
- 대용량 데이터 처리에서 훨씬 빠른 성능을 내는 엔진
- 확장성
- 드라이버(SparkContext): 통제 → 네임 노드
- 클러스터 매니져
- Executors: 데이터 노드
- Cache
- Tasks
- 속도
- 맵리듀스보다 적어도 100배는 빠른 속도를 가지고 있다.
- DAG 엔진으로 최적화를 할 수 있다.
- 유행
- AWS, Ebay 등 많은 기업이 스파크를 사용하고 있다.
- 쉬움
- 파이썬, 자바, 스칼라로 프로그래밍할 수 있다.
- RDD
스파크 구성
- Spark Streaming
- Spark SQL
- SQL로 쿼리를 만들고 변환할 수 있다.
- MLLib
- 스파크로 머신러닝
- GraphX
- 소셜 그래프를 통해 분석
RDD(Resilient Distributed Dataset)
- 탄력적 분산 데이터셋
- 클러스터 전체에 걸쳐 작업이 고르게 분산되어 실패에 탄력적으로 대처할 수 있게 한다.
- 드라이버가 SparkContext를 만들고 그걸 통해 RDD를 만들수 있다.
- S3, HDFS, hive, JDBC, JSON 등 대부분의 데이터베이스로부터 RDD를 만들 수 있다.
- RDD 변환
- map, flatmap, distinct, filter, union, intersection 등 RDD를 변환할 수 있는 다양한 함수가 있다.
- rdd.map(lambda x: x*x)
- RDD actions
- RDD의 모든 결과를 드라이버로 가져와 모든 작업을 수행할 수 있는 오브젝트를 텍스트 파일로 저장해 반환한다.
- collect, count, countByValue, take, top, reduce...
- 지연 평가: action 함수를 호출하기 전까지는 실제로 시작되지 않는다. 그리고 실제로 해당 함수를 호출하면 가장 효율적인 방법을 역으로 탐색하여 실행한다.
Spark 코드
- Ambari Spark의 Configuration에서 spark의 log4j 부분의 설정을 기본값으로 바꾼다. log4j.rootCategory=ERROR
conf = SparkConf().setAppName("WorstMovies")
-> 분배 방식이나 실행되는 클러스트 종류, 익스큐터에 할당된 메모리 할당 등을 구성할 수도 있다.
sc = SparkContext(conf = conf)
# Load up our movie ID -> movie name lookup table
movieNames = loadMovieNames()
# Load up the raw u.data file
lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
# Convert to (movieID, (rating, 1.0))
movieRatings = lines.map(parseInput)
# Reduce to (movieID, (sumOfRatings, totalRatings))
ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )
# Map to (rating, averageRating)
averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
# Sort by average rating
sortedMovies = averageRatings.sortBy(lambda x: x[1])
# Take the top 10 results
results = sortedMovies.take(10)
- spark-submit
- 모든 클러스터 매니저 간에 작업을 제출해주는 툴. 클러스터에서 애플리케이션을 수행하기 위해 사용한다. 즉, 작성한 코드를 스파크에 제출하여 실행하는 역할.
- 파라미터를 전달하여 실행할 클러스터나 메모리 할당을 할 수 있다.
Spark SQL
- 데이터프레임
- 행 객체의 데이터셋
- 스파크는 구조화된 데이터를 다루는 방향으로 진화하는 추세이다.
- RDD의 여러 단점을 보완(메모리 관리 측면, 실행 계획 최적화)하기 위해 데이터프레임으로 확장한다.
- 그렇게 되면 SQL을 사용할 수 있다. 구조화된 데이터를 사용하는 머신러닝 분야에서도 큰 강점을 가지게 되었다.
- 데이터셋
- 데이터프레임을 포함하는 조금 더 일반적인 개념
- RDD의 편의성과 데이터프레임의 퍼포먼스 최적화를 제공한다.
- UDF
- 사용자 정의 함수를 생성하여 SQL에 연결하고 자신의 함수를 만들 수 있다.
Spark 2.0 코드
- SparkSession()
- SparkContext와 SparkSQL을 포함
- export SPARK_MAJOR_VERSION=2로 spark2를 사용한다는 것을 호튼웍스에 알린다.
spark = SparkSession.builder.appName("PopularMovies").getOrCreate() # Load up our movie ID -> name dictionary movieNames = loadMovieNames() # Get the raw data lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data") # Convert it to a RDD of Row objects with (movieID, rating) movies = lines.map(parseInput) # Convert that to a DataFrame movieDataset = spark.createDataFrame(movies) # Compute average rating for each movieID averageRatings = movieDataset.groupBy("movieID").avg("rating") # Compute count of ratings for each movieID counts = movieDataset.groupBy("movieID").count() # Join the two together (We now have movieID, avg(rating), and count columns) averagesAndCounts = counts.join(averageRatings, "movieID") # Pull the top 10 results topTen = averagesAndCounts.orderBy("avg(rating)").take(10)
MLLib
spark = SparkSession.builder.appName("MovieRecs").getOrCreate()
# This line is necessary on HDP 2.6.5:
spark.conf.set("spark.sql.crossJoin.enabled", "true")
# Load up our movie ID -> name dictionary
movieNames = loadMovieNames()
# Get the raw data
lines = spark.read.text("hdfs:///user/maria_dev/ml-100k/u.data").rdd
# Convert it to a RDD of Row objects with (userID, movieID, rating)
ratingsRDD = lines.map(parseInput)
# Convert to a DataFrame and cache it
ratings = spark.createDataFrame(ratingsRDD).cache()
# Create an ALS collaborative filtering model from the complete data set
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
model = als.fit(ratings)
# Print out ratings from user 0:
print("\\\\nRatings for user ID 0:")
userRatings = ratings.filter("userID = 0")
for rating in userRatings.collect():
print movieNames[rating['movieID']], rating['rating']
print("\\\\nTop 20 recommendations:")
# Find movies rated more than 100 times
ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
# Construct a "test" dataframe for user 0 with every movie rated more than 100 times
popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))
# Run our model on that list of popular movies for user ID 0
recommendations = model.transform(popularMovies)
# Get the top 20 movies with the highest predicted rating for this user
topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)
for recommendation in topRecommendations:
print (movieNames[recommendation['movieID']], recommendation['prediction'])
spark.stop()
반응형
'데이터 엔지니어링 > Hadoop' 카테고리의 다른 글
Sec.6 NoSQL & Hadoop (0) | 2022.01.25 |
---|---|
Sec5. Hadoop & RDB (0) | 2022.01.25 |
Sec3. Pig (0) | 2022.01.25 |
Sec2. HDFS & MapReduce (0) | 2022.01.25 |
Sec1. 하둡 설치 및 개요 (0) | 2022.01.18 |