중화사전망 - 서예자전 - Scala 에서 rdd type 에 사용되는 헤더 파일은 무엇입니까?

Scala 에서 rdd type 에 사용되는 헤더 파일은 무엇입니까?

1 소개. RDD:

RDD 는 요소의 분산 집합인 유연한 분산 데이터 세트입니다. Spark 에서 모든 데이터에 대한 작업은 RDD 생성, 기존 RDD 변환, 평가를 위해 RDD 작업을 호출하는 것 이상입니다. 이 모든 것 뒤에 Spark 는 RDD 의 데이터를 클러스터에 자동으로 배포하고 작업을 병렬화합니다.

Spark 의 RDD 는 변경되지 않는 분산 객체 세트입니다. 각 RDD 는 클러스터의 서로 다른 노드에서 실행되는 여러 파티션으로 나뉩니다. RDD 에는 Python, Java, Scala 에 있는 모든 유형의 객체, 심지어 사용자 정의 객체까지 포함될 수 있습니다.

사용자는 외부 데이터 세트를 읽거나 list 또는 set 와 같은 객체 컬렉션을 드라이버에 배포하는 두 가지 방법으로 RDD 를 만들 수 있습니다.

RDD 의 변환 작업은 지연 평가됩니다. 즉, RDD 에서 변환 작업을 호출할 때 작업이 즉시 실행되지 않습니다. 대신 Spark 는 요청된 작업에 대한 정보를 내부적으로 기록합니다. RDD 를 구체적인 데이터가 있는 데이터 세트로 보아서는 안 되며, 각 RDD 를 변환 연산을 통해 구성한 명령 목록으로 보고 데이터 계산 방법을 기록하는 것이 좋습니다. 데이터를 RDD 로 읽는 작업도 게을러서 필요할 때만 데이터를 읽습니다. 변환 및 읽기 작업을 여러 번 수행할 수 있습니다.

2. RDD 데이터 세트를 생성합니다

(1) 외부 데이터 세트 읽기

Val input = sc.textfile (input file dir)

(2) list 를 예로 들어 객체 세트를 배포합니다.

Vallines = sc.parallelize (list ("hello world", "이것은 테스트입니다"));

3.RDD 작업

(1) 변환 작업

필터링 변환 작업을 수행하려면 다음과 같이 하십시오.

Vallines = sc.parallelize (list ("error: a", "error:b", "error:c", "test "));)

Valerrors = lines.filter (line = > Line.contains ("error"));

Errors.collect () 를 사용합니다. Foreach (println);

출력:

오류: a

오류: b

오류: c

목록에 error 라는 단어가 포함된 항목이 올바르게 필터링되었음을 알 수 있습니다.

(2) 병합 작업

두 개의 RDD 데이터 세트를 하나의 RDD 데이터 세트로 결합합니다.

위의 절차 예를 따릅니다.

Vallines = sc.parallelize (list ("error: a", "error:b", "error:c", "test", "

Valerrors = lines.filter (line = > Line.contains ("error"));

Valwarnings = lines.filter (line = > Line.contains ("warnings"));

Val unionLines =errors.union (경고);

UnionLines.collect () 를 사용합니다. Foreach (println);

출력:

오류: a

오류: b

오류: c

경고: a

원래 목록 항목의 오류 항목과 경고 항목이 모두 필터링된 것을 볼 수 있습니다.

(3) RDD 데이터 세트의 일부 또는 모든 요소 얻기

① RDD 데이터 세트의 일부 요소를 얻을 수 있습니까? 。 Take (정수)? 반환 값 목록

RDD 데이터 세트에서 처음 num 개 항목을 가져옵니다.

/* *

* RDD 의 처음 num 개 요소를 가져옵니다. 이렇게 하면 구역을 하나씩 스캔합니다.

* 많은 파티션이 필요한 경우 속도가 느려질 수 있습니다. 이 경우 collect () 를 사용하여 가져옵니다

* 전체 RDD 로 변경합니다.

*/

Def take(num: Int): JList[T]

프로그램 예: 연결

UnionLines.take(2) 입니다. Foreach (println);

출력:

오류: a

오류: b

RDD 데이터 세트 연합선의 처음 두 항목이 출력되었음을 알 수 있습니다.

② RDD 데이터 세트의 모든 요소를 가져옵니다. Collect () 는 목록을 반환합니다.

프로그램 예:

Val all = unionlines.collect ();

All.foreach (println);

출력 RDD 데이터 세트 통합 라인의 각 항목을 통과합니다.

4. spark 에 기능을 전달합니다

Scala 에서는 Scala 의 다른 함수 기반 API 와 마찬가지로 정의된 인라인 함수, 메서드 참조 또는 정적 메서드를 Spark 에 전달할 수 있습니다. 우리는 반드시 다른 세부 사항을 고려해야 한다. 전달해야 하는 함수와 이들이 참조하는 데이터는 직렬화 가능해야 합니다 (Java 의 Serializable 인터페이스 구현). 또한 파이썬과 마찬가지로 객체의 메서드나 필드를 전달할 때 전체 객체에 대한 참조가 포함됩니다. 전체 객체에 필드가 포함되지 않도록 원하는 필드를 로컬 변수에 배치할 수 있습니다.

클래스 검색 함수 (val 쿼리: 문자열) {

Def isMatch(s: String): 부울 = {

포함 (질의)

}

Def getmatchfunctionreference (rdd: rdd [문자열 ]):RDD[ 문자열] = {

//질문: isMach 는 this.isMatch 를 대표하므로 전체 this 를 통과해야 합니다.

Rdd.filter(isMatch)

}

Def getmatchesfunctionreference (rdd: rdd [문자열 ]):RDD[ 문자열] = {

//질문: query 는 this.query 를 대표하므로 this 전체를 전달해야 합니다.

Rdd.flatmap (line = > Line.split (조회))

}

Def getmatchesnoreference (rdd: rdd [문자열 ]):RDD[ 문자열] = {

//보안, 필요한 필드만 제거하고 로컬 변수에 넣습니다.

Valquery1= this.query;

Rdd.flatmap (x = > X.split (조회 1)

) 을 참조하십시오

}

}

5. 각 요소 변환 작업에 대해 다음을 수행합니다.

변환 작업 맵 () 은 함수를 수신하고, 이 함수를 RDD 의 각 요소에 적용하고, 이 함수의 반환 결과를 결과 RDD 의 해당 요소로 사용합니다. 키워드: 변환

변환 작업 필터 () 는 함수를 받아들이고 새 RDD 의 함수를 만족하는 RDD 의 요소를 반환합니다. 키워드: 필터링

예제 그림은 다음과 같습니다.

① 지도 ()

각 RDD 값의 제곱을 계산합니다.

Valrdd = sc.parallelize (list (1,2,3,4));

Valresult = rdd.map (값 = > 값 * 값);

Println(result.collect (). MkString (",");

출력:

1,4,9, 16

필터 ()

②? RDD 세트에서 값이 1 인 요소를 삭제합니다.

Valrdd = sc.parallelize (list (1,2,3,4));

Valresult = rdd.filter (값 = > 값! = 1);

Println(result.collect (). MkString (",");

결과:

2,3,4

우리는 다음과 같은 전달 함수의 형태를 취할 수도 있습니다.

기능:

Def filter function (값: int): 부울 = {

값! = 1

}

사용:

Valrdd = sc.parallelize (list (1,2,3,4));

Valresult = rdd.filter (필터 기능);

Println(result.collect (). MkString (",");

③ 경우에 따라 각 입력 요소에 대해 여러 개의 출력 요소를 생성하려고 합니다. 이 기능을 구현하는 작업을 flatMap () 이라고 합니다. Map () 과 마찬가지로 flatMap () 에 제공한 함수는 RDD 를 입력하는 각 요소에 적용됩니다. 그러나 요소가 아니라 반환 값 시퀀스의 반복자입니다. 출력 RDD 는 반복자로 구성되어 있지 않습니다. 각 반복자가 액세스할 수 있는 모든 요소가 포함된 RDD 를 얻습니다. FlatMap () 의 간단한 사용법은 다음 그림과 같이 입력 문자열을 단어로 자르는 것입니다.

Valrdd = sc.parallelize (list ("hello world", "hello you", "world I love you ")););

Valresult = rdd.flatmap (line = > Line.split ("");

Println(result.collect (). Mkstring ("\ n");

출력:

안녕하세요

세계

안녕하세요

너희들

세계

사랑

너희들

6. 집합 연산

RDD 에서 작업 설정

기능

사용

RDD 1.distinct ()

다른 요소만 포함하는 새 RDD 를 생성합니다. 데이터 셔플이 필요합니다.

RDD 1.union(RDD2)

두 RDD 의 모든 요소가 포함된 RDD 를 반환합니다.

RDD 1. 교차 (RDD2)

두 rdd 모두에 존재하는 요소만 반환됩니다.

RDD 1.substr(RDD2)

첫 번째 RDD 에만 있고 두 번째 RDD 에는 없는 모든 요소로 구성된 RDD 를 반환합니다. 데이터 셔플이 필요합니다.

집합 연산에 의한 데카르트 집합 처리:

RDD 1. 데카르트 좌표 (RDD2)

두 개의 RDD 데이터 세트의 데카르트 컬렉션을 반환합니다.

프로그램 예: RDD 세트 {1, 2} 및 {1, 2} 에 대한 데카르트 세트를 생성합니다.

Valrdd1= sc.parallelize (list (1,2));

Valrdd2 = sc.parallelize (list (1,2));

Val rdd = rdd1.cartesian (rdd2);

Println(rdd.collect (). Mkstring ("\ n");

출력:

(1, 1)

(1,2)

(2, 1)

(2,2)

7. 작업

(1) 감소 연산

Reduce () 는 두 RDD 요소 유형의 데이터를 조작하고 동일한 유형의 새 요소를 반환하는 함수를 매개 변수로 사용합니다. 간단한 예는 우리의 RDD 를 축적하는 데 사용할 수 있는 함수+입니다. Reduce () 를 사용하면 RDD 에 있는 모든 요소의 합계, 요소 수 및 기타 유형의 합산 작업을 쉽게 계산할 수 있습니다.

다음은 RDD 데이터 세트의 모든 요소를 합산하는 프로그램의 예입니다.

Valrdd = sc.parallelize (list (1,2,3,4,5,6,7,8,9,/kloc-0)

Val results=rdd.reduce((x, y) = > X+y);

Println (결과);

생산량: 55

(2) 접기 () 작업

Reduce () 에서 수신한 함수와 동일한 함수를 받고 각 파티션에 대한 첫 번째 호출의 결과로 초기 값을 추가합니다. 제공한 초기 값은 사용자가 제공한 연산의 단위 요소여야 합니다. 즉, 함수를 사용하여 이 초기 값을 여러 번 계산해도 결과가 변경되지 않습니다 (예:+대응 0, * 대응 1 또는 패치 연산에 해당하는 빈 목록).

프로그램 예:

① RDD 데이터 세트의 모든 요소의 합계를 계산합니다.

Zerovalue = 0; //합계 시 초기 값은 0 입니다.

Valrdd = sc.parallelize (list (1,2,3,4,5,6,7,8,9,/kloc-0)

Val results=rdd.fold(0)((x, y) = > X+y);

Println (결과);

② RDD 데이터 세트의 모든 요소의 곱 계산:

제로값 =1; //적분할 때 초기 값은 1 입니다.

Valrdd = sc.parallelize (list (1,2,3,4,5,6,7,8,9,/kloc-0)

Valresults = rdd.fold (1) ((x, y) = > X * y);

Println (결과);

(3)aggregate () 연산

Aggregate () 함수의 반환 값 유형이 작업의 RDD 유형과 같을 필요는 없습니다.

Fold () 와 마찬가지로 aggregate () 를 사용할 때 반환할 유형의 초기 값을 제공해야 합니다. 그런 다음 함수를 사용하여 RDD 의 요소를 결합하고 누적기에 넣습니다. 각 노드가 로컬로 누적되는 것을 고려하여 마지막으로 두 번째 함수를 제공하여 누적기를 쌍으로 조합해야 합니다.

다음은 프로그램 예제입니다.

Valrdd = sc.parallelize (list (1,2,3,4,5,6,7,8,9,/kloc-0)

Valresult = rdd.aggregate ((0,0)) (

(ACC, 값) = > (ACC. _ 1+ 값, 일치. _2+ 1),

(ACC 1, ACC 2)=>;; (ACC 1. _ 1+acc2. _ 1, ACC 1. _2+acc2. _2)

) 을 참조하십시오

평균 = 결과. _ 1/ 결과. _2;

Println (평균)

출력: 5

마지막으로 반환된 것은 튜플 2 입니다

표: 데이터가 {1, 2,3,3} 인 RDD 에 대한 기본 RDD 작업을 수행합니다.

함수 이름 항목의 샘플 결과

Collect () 는 rdd.collect () 의 모든 요소 {1, 2,3,3} 을 반환합니다

Count () 의 요소 수 RDD rdd.count() 4

CountByValue () 각 요소가 RDD RDD 에 나타나는 횟수입니다. Countbyvalue () {( 1, 1),

(2, 1),

(3,2)

}

Take(num) 는 rdd 에서 num 요소 rdd.take(2) {1, 2} 를 반환합니다.

Top(num) 은 RDD 의 처음 num 개 요소를 반환합니다. RDD 에서 주문 (2) (내 주문) {3,3}.

주문됨 (수량)

(정렬) 제공된 순서대로 RDD 에서 처음 num 개 요소를 반환합니다.

Rdd.takeSample(false, 1) 은 불확실합니다.

Take sample (with replacement, num, [seed]) 은 rdd 에서 원하는 수의 요소를 반환하며 takeSample(false, 1) 은 불확실합니다.

Reduce(func) 는 RDD 의 모든 데이터를 동시에 통합합니다. Reduce ((x, y) = > x+y)

아홉;구;9

Fold(zero)(func) 는 reduce () 와 동일하지만 초기 값 rdd.fold(0)((x, y) = > X+y)

아홉;구;9

Aggregate (0 값) (seq op, combo) 는 reduce () 와 비슷하지만 일반적으로 다른 유형의 함수 rdd.aggregate((0 (0,0)) 를 반환합니다.

((x, y) = >

(x._ 1+y, x._2+ 1),

(x, y) = >

(x. 1+y. 1, x. 2+y. 2)

) (9,4)

Foreach(func) 는 RDD.foreach(func) None 의 각 요소에 지정된 함수 rdd 를 사용합니다.

8. 영구 캐시

스파크 RDD 는 게으르기 때문에, 때때로 우리는 같은 RDD 를 여러 번 사용하고 싶다. RDD 에서 동작을 간단히 호출하면 Spark 는 매번 RDD 와 모든 연결지를 다시 계산합니다. 반복 알고리즘은 종종 동일한 데이터 세트를 여러 번 사용하기 때문에 반복 알고리즘에서 특히 비쌉니다.

동일한 RDD 가 여러 번 계산되지 않도록 Spark 는 데이터를 지속할 수 있습니다. Spark 에게 RDD 를 영구적으로 저장하라고 하면 RDD 를 계산하는 노드는 이미 계산된 파티션 데이터를 저장합니다.

다른 목적을 위해 RDD 에 대해 다른 지속성 수준을 선택할 수 있습니다. 기본적으로 persist () 는 JVM 의 힙 공간에 직렬화된 형식으로 데이터를 캐시합니다.

서로 다른 키워드에 대한 저장 레벨 테이블

등급

사용된 공간

중앙 처리 시간

메모리에 있습니까?

디스크에 있습니까?

평론

메모리만 해당

높은

낮은

이다

아니

메모리에 직접 저장

사용자만 저장

낮은

높은

이다

아니

직렬화하여 메모리에 저장합니다.

메모리 및 디스크

낮은

매체

부분

부분

메모리에 데이터가 들어가지 않으면 디스크로 넘칩니다.

메모리 및 디스크 사용자

낮은

높은

부분

부분

데이터가 메모리에 들어가지 않아 디스크에서 넘칩니다. 직렬화된 데이터는 메모리에 저장됩니다.

디스크만

낮은

높은

아니

이다

하드 드라이브에 직접 저장합니다.

프로그램 예: RDD 데이터 세트를 메모리에 저장

Valrdd = sc.parallelize (list (1,2,3,4,5,6,7,8,9,/kloc-0) 지속성 (StorageLevel). Memory _ only);

인쇄 (rdd.count ())

Println(rdd.collect (). MkString (",");

RDD 에는 캐시에서 영구 RDD 를 수동으로 제거하기 위해 호출할 수 있는 unpersist () 메서드도 있습니다.

9. 다른 RDD 유형

Scala 에서 RDD 를 특정 함수로 변환하는 RDD (예: RDD[Double] 의 숫자 연산) 는 암시적 변환을 통해 자동으로 처리됩니다. 이러한 암시적 변환은 RDD 를 DoubleRDDFunctions (숫자 데이터의 RDD) 및 PairRDDFunctions (키 값 쌍의 RDD) 와 같은 다양한 패키지 클래스로 암시적으로 변환하여 mean () 및 variance () 를 제공합니다

예제 프로그램:

Valrdd = sc.parallelize (list (1.0, 2.0, 3.0, 4.0, 5.0));

Println (rdd.mean ());

실제로 RDD[T] 에는 mean () 함수가 없지만 암시적 변환은 이를 DoubleRDDFunctions 로 자동 변환합니다.