2024-04-29 Lunes

¿De dónde eles?

Norte de China

Alemania

  • alemán
  • almana

Marruecos 摩洛哥

  • marroquí m + f

Argetina -f, 也是国家名

  • Argentino -m

Francia

  • francés
  • francesa

Soy venezolano/a = soy de Venezuela

¿Cuántos años tienes?

No pareces = don’t looks like pareces más joven = you looks yonger

¿Cuánto tiempo vas a estudia? (future tense) = how long you will study

tres semanas

un fin de semana = a weekend

un semestre = un semester, usually means six months

¿En qué trabajas? = Which job do you do?

Trabajar + como: Trabajo como informático.

Trabajar + en: Trabajo en una empresa.

Estoy jubilado = I am retired

Quiero jubilarme en Ecuador. = I want to retire in Ecuador.

¡Qué suerte! = what luck!, lucky you!

Antes = before

  • Antes era “CEO” de varias empresas internacionales. = I was CEO in several international companies.

¿Dónde vives aqi en Madrid? = Where do you live here in Madrid?

En la Calle Fuencallal = In the Street Fuencallal

calle = street

Vosotros teneís que visitar los barrios de Malasaña Justicia Chueca = forgot what is its meaning

Metro Tribunal

Camino a Sampere = I walk to Sampere.

Muy cerca de Sampere = Very close to Sampere

Vivo en Las Rosas que está mas lejos = I live in the very far away Las Rosas.

lejos = far away

  • cerca de = close to
  • lejos de = far away from

Vivo en el norte de Madrid.

¿Quieres visitado? = Do you want to visit it?

Santiago Bernabéu = 圣地亚哥·伯纳乌足球场是一座位于西班牙马德里的全座椅足球场。

  • sala de trofeos = the room of 奖杯(trophy)

edificio = building

antiguo = old

  • El edficio nuevo
  • El edficio antiguo

¿Cómo se dice en español? = how can you speak this in Spanish?

  • Es muy caro = it is very cheap
  • Es muy barato = it is very expensive

Like, love

  • me encanta = I love
  • me gusta = I like

muchas veces = lots of times

simpatico = nice

una vez a la semana = 1 time per week

Vuelvo a Singapur en 3 semanas. = I go back to Singapore after 3 weeks.

¿Qué te gusta hacer en tu tiempo libre? = What do you like to do in your free time.

Me gusta ver películas en español con subtitulos y series. = I like watching movies in Spanish with subtitles and series.

películas = movies

¿Qué mas? = What else?

jugar al ajedres = play chess

cansado = tired

a mí también = me too

Me gusta hacer senderismo en las montañas. = I like hiking in the mountains.

Me gusta mucho. = I like a lot.

  • el cine = cinema
  • comer en restarantes
  • quedar con mis amigos en terrazas (地面, 臺地, 露天平臺,陽台, 梯田;階地, 房子, 排屋,排房) = hang out with my friends outside
  • enseñar español = teach Spanish

¿Cuántos clases de español tienes al dia? = How many classes do you have in a day.

  • clases = class, classroom

Sequences

  • 1a = la primera = first
  • 2a = la segunda = second
  • 3a = la tercera
  • 4a = la cuarta
  • 5a = la quinta
  • 6a = la sexta

OK in Spanish

  • ¡Claro!
  • ¡Por supuesto! = of course

¿listos? = ready?

  • listo / lista
    • meaning 1: a name of metro station
    • meaning 2: inteligente
    • meaning 3: ready = preparado/preparada

Reflexivo = 反身動詞

Ser +

  • nacionnalidad
    • Soy de Suecia (Soy + de + país o ciudad)
    • Pedro es sueco
    • Pedro y Maria son almanes. (nombre m + nombre f + son + adjectivo plural masculino)
  • profesión
  • nombre

masculino femenino para la nacionnalidad

  • -o m
  • -a f
  • -e m,f

trabajar +

  • en una empresa(company)
  • como informático

Jorge es dramaturgo escribe obras de teatro. = Jorge is a playwright, he writes plays

diseñadora = woman designer

  • diseñador - m
  • diseñadora - f

diseñador de moda = fashion designer

Periodista - m, f = 记者

-sta is m + f

  • tenista 网球运动员
    • Gabriela Sabatini es tenista
    • Rafa Nadal es tenista
  • futbolista
  • taxista = taxi driver

socorrista = lifeguard, protect people when people are swimming

peluaquero/-a = hairdresser

pescadero/-a = fish merchant

fisherman:

  • pescador, pescadores
  • pescadora, pescadoras

cocer = cocinar = to cook

cajero, cajera = cashier

supermercado = supermarket

científico, científica = scientific, scientist (adj or a job)

Mi casero se llama Ng es cocinero y trabaja en una tienda de comida.

  • el casero; la casera = landlord

Mi amigos son informaticos en unas empresas de Singapur.

  • 我的朋友是新加坡一些公司的计算机科学家。

ingeniera - f, ingeniero - m = 工程师 ingeniería = engineering

bombero - m, bombera - f = firefighter

policía, m f the same = police

  • Maria es policía.
  • Pedro es policía y trabaja en una comisaria de policía.
    • comisaría = 警察局

repartidor - m, repartidora - f = delivery man/woman

  • Pedor es repartidor de Amazon.

cartero - m, cartera - f = postman/postwoman

  • Maria es cartera y trabaja en correos.
    • correos = post office

conductor - m, conductora - f = conductor, driver

  • Pedro es conductor de autobús.

veterinario/-a = veterinarian = 兽医

vegetariano/-a = 素食主义者

something about the class

  • Todos los lunes hay desayuno gratis (Free breakfast is available every Monday)
  • pausa = rest between 2 classes

Tres formas para decir(to say) el nombres:

  • Me llamo Vanesa
  • Soy Vanesa
  • Mi nombre es Vanesa

estética 美学

mismo

  • 你和某个人一样
    • Lo mismo que +人名
    • Lo iqual que +人名

es lo mismo = it’s the same

ahora mismo = right now

enseguida = right away

pasar 度过

dieciséis = diez + seis

esperar 等待

  • espere verde 等路口的红绿灯变绿的标语

algo = something

broma = joke

saludar = 打招呼

despedirse = 告别

  • 上课讲过这个但记错了

pelo = caballo = hair

  • pelo 也可以表示毛发 fur

flambear = to blaze = 燃烧

  • fuego = fire
  • flambear = hacer un fuego grande para cocinar = Make a big fire for cooking

hilo = thread

  • pequeña porcíon de tela que se usa coser la ropa = small piece of fabric that is used to sew clothes

perchero = clothes rack, coat stand = 横着的衣架或者大衣架

Ser, Estar

mal visto 难看 不雅观

  • está mal visto

característica = feature

ser usage example
característica: física = 外表 Soy pequeño.
característica: personalidad = 性格 Soy introvertido. 我内向
。。 Soy una persona triste/alegre. 我是个悲观/乐观的人
característica: nacionalidad = 国籍 Soy de China.
材料 material (¿De qué está hecho? = what is it made of) El teclado está hecho de plástico. 键盘由塑料制成。
estar usage example
ubicaciones 地点 El café está cerca de aquí.
pasajero 持续时间短,昙花一现 Soy introvertido. 我内向
。。 Estoy triste/alegre 我很伤心/开心
  • tener que

  • People’s status

    • soltero, soltera = single
    • casado, casada = married
    • divorciado, diverciada = divorcee 离婚男/女
    • tengo novio/novia
    • tengo pareja = have partner
      • pareja: m, f
  • así que = so, therefore

  • conoces a alguien = meet somebody

to try

  • intentar
    • inténtalo = try it
  • probar
    • Proeba esta sopa, está deliciosa.

age = edad

to be late

  • llegar tarde
    • llega tarde a clase
  • es muy tarde
    • Oh, es muy tarde, me voy.

siéntate = have a seat

los diás de la semana

  • el lunes
  • el martes
  • el miértes
  • el jueves
  • el vienes

el finde = 周末

  • el sábado
  • el domingo

叙述日期时间

  • el 2 de mayo de 1808

¿Cuantas vocales tenemos?

  • a e i o u
  • Hay 5 vocales y 22 consonantes
  • En españa la b y la v suenan igual
  • La h es muda. Pero tenemos ch.
    • mudo, muda 哑的 (adj)

prononce

  • suave = 弱

  • fuerte = strong = 强

  • g

    • g + a = suave
    • g + e = j
      • gue = suave
    • g + i = j
      • gui = suave
    • g + o = suave
    • g + u = suave
    • g + ü + i : Si la u tiene 2 puntos (two spots) suena. = u 在有两个小点的时候,发音。
  • c

    • suena fuerte
    • c + a = suena fuerte = ka
      • que
      • qui
      • co
      • cu
    • suena suave
      • ce = se = ze
      • ci = si = zi

taller = workshop

  • taller de cocina
  • ~ de literario = 文学工作坊
  • ~ de pintura = paint shop
  • ~ de mecánico = 汽车修理店

Problem

We have left table and right table, after some repartition, there are 1 partition got joined across lots of partitions.

Here are the partition data inside:

left table partition L right table partition R
1 4
4 4
8 7
10

Suppose after RDD.zipPartition, L and R should be joined together.

Without Spark codegen opened, how many times Spark will visit the data inside L and R?

  • Will it be L * R? Like 3 * 4 = 12?
  • Will it be L + R? Like 3 + 4 = 7?

Result

Time Complexity

It is L + R + 2 * C

  • C is right side joined count

Spark Optimization

If right side table have lots of column cannot be joined, Spark still need to waste time to iterate it We can try

  • Add more partition to reduce the chances of the hash collapse
  • Use appropriate filter before join, to eliminate the chances that Spark iterate records that cannot be joined
  • Use Bloom Filter when we are not sure how to write the filter

Analyze

1. Download Spark source code

  • Make sure you have linux environment, my environment is Ubuntu 22.04 LTS
  • Make sure running this command first mvn clean -DskipTests -T 20 package, eliminate all the maven compile issue, then refresh maven import
  • Spark version: 2.3, which means you need to change to branch-2.3

2. Change a unit test

2.1 Details

For testing the DataFrame processing, we need to hardcode the data inside a UT.

We pick up the following UT to make this change:

  • Path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
  • Change:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
      test("SPARK-20897: cached self-join should not fail") {
    withSQLConf((SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0"), // force to plan sort merge join
    (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false" ) // force Spark not to use CodeGen
    ) {
    // 0x added behind because later we output using print(), it will shows HEX value
    val df1 = Seq(0x1 -> 0x1, 0x4 -> 0x4, 0x8 -> 0x8).toDF("i", "j").as("t1").coalesce(1) // coalesce force all the data into one partition
    val df2 = Seq(0x10 -> 0x1010, 0x4 -> 0x44, 0x4 -> 0x45, 0x7 -> 0x77)
    .toDF("i", "j").as("t2").coalesce(1)
    df1.join(df2, $"t1.i" === $"t2.i", "left_outer").explain(false)
    df1.join(df2, $"t1.i" === $"t2.i", "left_outer").show(100, false)
    }
    }

Then, we add some log output to Spark source code

  • Path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
  • Change:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
      protected override def doExecute(): RDD[InternalRow] = {

    /*... Ignore some unchanged code ...*/

    case LeftOuter =>
    val smjScanner = new SortMergeJoinScanner(
    streamedKeyGenerator = createLeftKeyGenerator(),
    bufferedKeyGenerator = createRightKeyGenerator(),
    keyOrdering,
    streamedIter = RowIterator.fromScala(leftIter),
    bufferedIter = RowIterator.fromScala(rightIter),
    inMemoryThreshold,
    spillThreshold
    )
    val rightNullRow = new GenericInternalRow(right.output.length)
    print("LeftOuter init in SparkPlan\n") // Indicate the Spark init the LeftOuterIterator
    new LeftOuterIterator(
    smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows).toScala

    /*... Ignore some unchanged code ...*/

    private[joins] class SortMergeJoinScanner(
    streamedKeyGenerator: Projection,
    bufferedKeyGenerator: Projection,
    keyOrdering: Ordering[InternalRow],
    streamedIter: RowIterator,
    bufferedIter: RowIterator,
    inMemoryThreshold: Int,
    spillThreshold: Int) {
    private[this] var streamedRow: InternalRow = _
    private[this] var streamedRowKey: InternalRow = _
    private[this] var bufferedRow: InternalRow = _
    private[this] var bufferedRowKey: InternalRow = _
    private[this] var matchJoinKey: InternalRow = _
    private[this] val bufferedMatches =
    new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)

    print(s"${this.hashCode()} SortMergeJoinScanner init\n") // Because we need to observe the first time Spark reads the right table, rather than left table, we need to add this log to here
    advancedBufferedToRowWithNullFreeJoinKey()

    /*... Ignore some unchanged code ...*/

    final def findNextOuterJoinRows(): Boolean = { // This is the main driver function to drive executor to iterate across all the left and right table
    print("findNextOuterJoinRows() \n") // we add a log to identify when it enters this function
    if (!advancedStreamed()) {
    matchJoinKey = null

    /*... Ignore some unchanged code ...*/

    private def advancedStreamed(): Boolean = { // This advanced Streamed drives the left table's Iterator to reads the left table
    if (streamedIter.advanceNext()) {
    streamedRow = streamedIter.getRow
    streamedRowKey = streamedKeyGenerator(streamedRow)
    // Add logs to identify the left table reading
    print(s"${this.hashCode()} advancedStreamed() streamedIter.advanceNext()=true streamedRow: ${streamedRow}\n")
    true
    } else {
    streamedRow = null
    streamedRowKey = null
    // Add logs to identify the left table reading
    print(s"${this.hashCode()} advancedStreamed() streamedIter.advanceNext()=false streamedRow: ${streamedRow}\n")
    false
    }
    }

    private def advancedBufferedToRowWithNullFreeJoinKey(): Boolean = { // This function drives the right table's Iterator to read the right table, and it can skip the empty row key
    var foundRow: Boolean = false
    while (!foundRow && bufferedIter.advanceNext()) {
    bufferedRow = bufferedIter.getRow
    // Add logs to identify the right table reading
    print(s"${this.hashCode()} [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: ${bufferedRow}, ")
    bufferedRowKey = bufferedKeyGenerator(bufferedRow)
    foundRow = !bufferedRowKey.anyNull
    }
    if (!foundRow) {
    // Add logs to identify the right table reading
    print(s"${this.hashCode()} advancedBufferedToRowWithNullFreeJoinKey() not found Row, current is ${bufferedRow} ]\n")
    bufferedRow = null
    bufferedRowKey = null
    false
    } else {
    // Add logs to identify the right table reading
    print(s"${this.hashCode()} advancedBufferedToRowWithNullFreeJoinKey() found Row, current is ${bufferedRow} ]\n")
    true
    }
    }

    private def bufferMatchingRows(): Unit = { // This function do the actually "link" between left joined data and right joined data
    assert(streamedRowKey != null)
    assert(!streamedRowKey.anyNull)
    assert(bufferedRowKey != null)
    assert(!bufferedRowKey.anyNull)
    assert(keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
    matchJoinKey = streamedRowKey.copy()
    bufferedMatches.clear()
    do {
    // When left record and right record get joined, right record will be stored inside the bufferedRow
    // It will be re-iterated later
    print(s"${this.hashCode()} bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])\n")
    bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
    advancedBufferedToRowWithNullFreeJoinKey()
    } while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
    }

    private class LeftOuterIterator(
    smjScanner: SortMergeJoinScanner,
    rightNullRow: InternalRow,
    boundCondition: InternalRow => Boolean,
    resultProj: InternalRow => InternalRow,
    numOutputRows: SQLMetric)
    extends OneSideOuterIterator(
    smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows) {
    // We add a log here to verify that, 1. the LeftOuterJoin is activated
    // 2. Record the hashcode
    print(s"${this.hashCode()} LeftOuterIterator init\n")

    protected override def setStreamSideOutput(row: InternalRow): Unit = joinedRow.withLeft(row)
    protected override def setBufferedSideOutput(row: InternalRow): Unit = joinedRow.withRight(row)

    /*... Ignore some unchanged code ...*/

    private def advanceStream(): Boolean = {
    print(s"${this.hashCode()} OneSideOuterIterator.advanceStream()\n") // This advanceStream() again, it is LeftOuterIterator's advanceStream
    rightMatchesIterator = null
    if (smjScanner.findNextOuterJoinRows()) {
    setStreamSideOutput(smjScanner.getStreamedRow)
    if (smjScanner.getBufferedMatches.isEmpty) {
    setBufferedSideOutput(bufferedSideNullRow)
    } else {
    // Find the next row in the buffer that satisfied the bound condition
    if (!advanceBufferUntilBoundConditionSatisfied()) {
    setBufferedSideOutput(bufferedSideNullRow)
    }
    }
    true
    } else {
    // Stream has been exhausted
    false
    }
    }

    // This function will iterate the bufferMatches again, to make the real joined record and output them
    private def advanceBufferUntilBoundConditionSatisfied(): Boolean = {
    // Add log here to prove the advanceBufferUntilBoundConditionSatisfied will be called
    print(s"${this.hashCode()} OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()\n")
    var foundMatch: Boolean = false
    if (rightMatchesIterator == null) {
    rightMatchesIterator = smjScanner.getBufferedMatches.generateIterator()
    // Add log here to see when it get the bufferMatches
    print(s"${this.hashCode()} advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = ${rightMatchesIterator.hashCode()} \n")
    }

    while (!foundMatch && rightMatchesIterator.hasNext) {
    setBufferedSideOutput(rightMatchesIterator.next())
    val nextMatch = rightMatchesIterator.next()
    // Add log here to see the content in bufferMatches, to prove that, bufferMatches stores the joined right table records
    print(s"rightMatchesIterator ${rightMatchesIterator.hashCode()} has ${nextMatch}")
    setBufferedSideOutput(nextMatch)
    foundMatch = boundCondition(joinedRow)
    }
    foundMatch
    }

    // LeftOuterIterator's advanceNext always try output the joined record first, then try to drive its advanceStream, then to drive SortMergeJoinScanner.findNextOuterJoinRows()
    override def advanceNext(): Boolean = {
    print(s"${this.hashCode()} OneSideOuterIterator.advanceNext()\n")
    val r = advanceBufferUntilBoundConditionSatisfied() || advanceStream()
    if (r) numOutputRows += 1
    r
    }
    override def getRow: InternalRow = resultProj(joinedRow)
    }

Please refer to this Commit:

3. Run and analyze the log

left side log trace left side data visit left table partition x right table partition y right side data visit right side log trace
/* 2. Left #1 */ LeftOuterIterator call its .advanceStream(), then drives findNextOuterJoinRows() call left table’s .advanceNext() 1 -> 1 4 -> 44 1. advancedBufferedToRowWithNullFreeJoinKey() when SortMergeJoinScanner init
2. This record was iterated AGAIN inside the buffer
1. /* 1. Right #1 */
2. /* 6. Right #1 */
/* 3. Left #2 */ After the 1st record cannot be joined, someone want to iterate again, so LeftOuterIterator’s advanceNext() was called, then left table’s advanceStream() also been called 4 -> 4 4 -> 45 1. advancedBufferedToRowWithNullFreeJoinKey() when doing the join
2. This record was iterated AGAIN inside the buffer
1. /* 4. Right #2 */
2. /* 7. Right #2 */
/* 8. Left #3 */ advancedStreamed(), then it hasn’t been joined 8 -> 8 7 -> 77 advancedBufferedToRowWithNullFreeJoinKey() when doing the join, then it hasn’t been joined /* 5. Right #3 */
10 -> 1010 advancedBufferedToRowWithNullFreeJoinKey(), then it hasn’t been joined /* 9. Right #4 */
Object HashCode
SortMergeJoinScanner 104408849
LeftOuterIterator 165384710
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/home/yzruntime/.jdks/corretto-1.8.0_402/bin/java -javaagent:/opt/idea-IC-233.14475.28/lib/idea_rt.jar=42213:/opt/idea-IC-233.14475.28/bin -Dfile.encoding=UTF-8 -classpath /home/yzruntime/.local/share/JetBrains/IdeaIC2023.3/Scala/lib/runners.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/charsets.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/cldrdata.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/dnsns.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/jaccess.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/jfxrt.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/localedata.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/nashorn.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/sunec.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/sunjce_provider.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/sunpkcs11.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/zipfs.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jce.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jfr.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jfxswt.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jsse.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/management-agent.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/resources.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/rt.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/core/target/scala-2.11/test-classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/core/target/scala-2.11/classes:/home/yzruntime/.m2/repository/com/univocity/univocity-parsers/2.5.9/univocity-parsers-2.5.9.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/sketch/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/core/target/scala-2.11/classes:/home/yzruntime/.m2/repository/org/apache/avro/avro-mapred/1.7.7/avro-mapred-1.7.7-hadoop2.jar:/home/yzruntime/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7.jar:/home/yzruntime/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7-tests.jar:/home/yzruntime/.m2/repository/com/twitter/chill_2.11/0.8.4/chill_2.11-0.8.4.jar:/home/yzruntime/.m2/repository/com/twitter/chill-java/0.8.4/chill-java-0.8.4.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-client/2.6.5/hadoop-client-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-common/2.6.5/hadoop-common-2.6.5.jar:/home/yzruntime/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/home/yzruntime/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/home/yzruntime/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/home/yzruntime/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/home/yzruntime/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/yzruntime/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/home/yzruntime/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/home/yzruntime/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/home/yzruntime/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/home/yzruntime/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/home/yzruntime/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/home/yzruntime/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/home/yzruntime/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/home/yzruntime/.m2/repository/org/apache/curator/curator-client/2.6.0/curator-client-2.6.0.jar:/home/yzruntime/.m2/repository/org/htrace/htrace-core/3.0.4/htrace-core-3.0.4.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.6.5/hadoop-hdfs-2.6.5.jar:/home/yzruntime/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/home/yzruntime/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/home/yzruntime/.m2/repository/xml-apis/xml-apis/1.4.01/xml-apis-1.4.01.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.6.5/hadoop-mapreduce-client-app-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.6.5/hadoop-mapreduce-client-common-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.6.5/hadoop-yarn-client-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.6.5/hadoop-yarn-server-common-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.6.5/hadoop-mapreduce-client-shuffle-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.6.5/hadoop-yarn-api-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.6.5/hadoop-mapreduce-client-core-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.6.5/hadoop-yarn-common-2.6.5.jar:/home/yzruntime/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/home/yzruntime/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-xc/1.9.13/jackson-xc-1.9.13.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.6.5/hadoop-mapreduce-client-jobclient-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-annotations/2.6.5/hadoop-annotations-2.6.5.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/launcher/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/kvstore/target/scala-2.11/classes:/home/yzruntime/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/network-common/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/network-shuffle/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/unsafe/target/scala-2.11/classes:/home/yzruntime/.m2/repository/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar:/home/yzruntime/.m2/repository/org/apache/httpcomponents/httpcore/4.4.8/httpcore-4.4.8.jar:/home/yzruntime/.m2/repository/org/apache/httpcomponents/httpclient/4.5.4/httpclient-4.5.4.jar:/home/yzruntime/.m2/repository/javax/activation/activation/1.1.1/activation-1.1.1.jar:/home/yzruntime/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.58/bcprov-jdk15on-1.58.jar:/home/yzruntime/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/1.1/java-xmlbuilder-1.1.jar:/home/yzruntime/.m2/repository/net/iharder/base64/2.3.8/base64-2.3.8.jar:/home/yzruntime/.m2/repository/org/apache/curator/curator-recipes/2.6.0/curator-recipes-2.6.0.jar:/home/yzruntime/.m2/repository/org/apache/curator/curator-framework/2.6.0/curator-framework-2.6.0.jar:/home/yzruntime/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-plus/9.3.24.v20180605/jetty-plus-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-webapp/9.3.24.v20180605/jetty-webapp-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-xml/9.3.24.v20180605/jetty-xml-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-jndi/9.3.24.v20180605/jetty-jndi-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-security/9.3.24.v20180605/jetty-security-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-util/9.3.24.v20180605/jetty-util-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-server/9.3.24.v20180605/jetty-server-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-io/9.3.24.v20180605/jetty-io-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-http/9.3.24.v20180605/jetty-http-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-continuation/9.3.24.v20180605/jetty-continuation-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-proxy/9.3.24.v20180605/jetty-proxy-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-client/9.3.24.v20180605/jetty-client-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-servlets/9.3.24.v20180605/jetty-servlets-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/javax/servlet/javax.servlet-api/3.1.0/javax.servlet-api-3.1.0.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-lang3/3.5/commons-lang3-3.5.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-math3/3.4.1/commons-math3-3.4.1.jar:/home/yzruntime/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/yzruntime/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/home/yzruntime/.m2/repository/org/slf4j/jul-to-slf4j/1.7.16/jul-to-slf4j-1.7.16.jar:/home/yzruntime/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.16/jcl-over-slf4j-1.7.16.jar:/home/yzruntime/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/home/yzruntime/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar:/home/yzruntime/.m2/repository/com/ning/compress-lzf/1.0.3/compress-lzf-1.0.3.jar:/home/yzruntime/.m2/repository/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar:/home/yzruntime/.m2/repository/org/lz4/lz4-java/1.4.0/lz4-java-1.4.0.jar:/home/yzruntime/.m2/repository/com/github/luben/zstd-jni/1.3.2-2/zstd-jni-1.3.2-2.jar:/home/yzruntime/.m2/repository/org/roaringbitmap/RoaringBitmap/0.7.45/RoaringBitmap-0.7.45.jar:/home/yzruntime/.m2/repository/org/roaringbitmap/shims/0.7.45/shims-0.7.45.jar:/home/yzruntime/.m2/repository/commons-net/commons-net/2.2/commons-net-2.2.jar:/home/yzruntime/.m2/repository/org/scala-lang/scala-library/2.11.8/scala-library-2.11.8.jar:/home/yzruntime/.m2/repository/org/json4s/json4s-jackson_2.11/3.2.11/json4s-jackson_2.11-3.2.11.jar:/home/yzruntime/.m2/repository/org/json4s/json4s-core_2.11/3.2.11/json4s-core_2.11-3.2.11.jar:/home/yzruntime/.m2/repository/org/json4s/json4s-ast_2.11/3.2.11/json4s-ast_2.11-3.2.11.jar:/home/yzruntime/.m2/repository/org/scala-lang/scalap/2.11.8/scalap-2.11.8.jar:/home/yzruntime/.m2/repository/org/scala-lang/scala-compiler/2.11.8/scala-compiler-2.11.8.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/core/jersey-client/2.22.2/jersey-client-2.22.2.jar:/home/yzruntime/.m2/repository/javax/ws/rs/javax.ws.rs-api/2.0.1/javax.ws.rs-api-2.0.1.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/hk2-api/2.4.0-b34/hk2-api-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/hk2-utils/2.4.0-b34/hk2-utils-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/external/aopalliance-repackaged/2.4.0-b34/aopalliance-repackaged-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/external/javax.inject/2.4.0-b34/javax.inject-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/hk2-locator/2.4.0-b34/hk2-locator-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/javassist/javassist/3.18.1-GA/javassist-3.18.1-GA.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/core/jersey-common/2.22.2/jersey-common-2.22.2.jar:/home/yzruntime/.m2/repository/javax/annotation/javax.annotation-api/1.2/javax.annotation-api-1.2.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/bundles/repackaged/jersey-guava/2.22.2/jersey-guava-2.22.2.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/osgi-resource-locator/1.0.1/osgi-resource-locator-1.0.1.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/core/jersey-server/2.22.2/jersey-server-2.22.2.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/media/jersey-media-jaxb/2.22.2/jersey-media-jaxb-2.22.2.jar:/home/yzruntime/.m2/repository/javax/validation/validation-api/1.1.0.Final/validation-api-1.1.0.Final.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/containers/jersey-container-servlet/2.22.2/jersey-container-servlet-2.22.2.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/containers/jersey-container-servlet-core/2.22.2/jersey-container-servlet-core-2.22.2.jar:/home/yzruntime/.m2/repository/io/netty/netty-all/4.1.17.Final/netty-all-4.1.17.Final.jar:/home/yzruntime/.m2/repository/io/netty/netty/3.9.9.Final/netty-3.9.9.Final.jar:/home/yzruntime/.m2/repository/com/clearspring/analytics/stream/2.7.0/stream-2.7.0.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.5/metrics-core-3.1.5.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.5/metrics-jvm-3.1.5.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.5/metrics-json-3.1.5.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-graphite/3.1.5/metrics-graphite-3.1.5.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/module/jackson-module-scala_2.11/2.6.7.1/jackson-module-scala_2.11-2.6.7.1.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/module/jackson-module-paranamer/2.7.9/jackson-module-paranamer-2.7.9.jar:/home/yzruntime/.m2/repository/org/apache/ivy/ivy/2.4.0/ivy-2.4.0.jar:/home/yzruntime/.m2/repository/oro/oro/2.0.8/oro-2.0.8.jar:/home/yzruntime/.m2/repository/net/razorvine/pyrolite/4.13/pyrolite-4.13.jar:/home/yzruntime/.m2/repository/net/sf/py4j/py4j/0.10.7/py4j-0.10.7.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-crypto/1.0.0/commons-crypto-1.0.0.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/core/target/scala-2.11/test-classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/catalyst/target/scala-2.11/classes:/home/yzruntime/.m2/repository/org/scala-lang/scala-reflect/2.11.8/scala-reflect-2.11.8.jar:/home/yzruntime/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/home/yzruntime/.m2/repository/org/codehaus/janino/janino/3.0.8/janino-3.0.8.jar:/home/yzruntime/.m2/repository/org/codehaus/janino/commons-compiler/3.0.8/commons-compiler-3.0.8.jar:/home/yzruntime/.m2/repository/org/antlr/antlr4-runtime/4.7/antlr4-runtime-4.7.jar:/home/yzruntime/.m2/repository/commons-codec/commons-codec/1.10/commons-codec-1.10.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/catalyst/target/scala-2.11/test-classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/tags/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/tags/target/scala-2.11/test-classes:/home/yzruntime/.m2/repository/org/apache/orc/orc-core/1.4.4/orc-core-1.4.4-nohive.jar:/home/yzruntime/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/home/yzruntime/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/home/yzruntime/.m2/repository/io/airlift/aircompressor/0.8/aircompressor-0.8.jar:/home/yzruntime/.m2/repository/org/apache/orc/orc-mapreduce/1.4.4/orc-mapreduce-1.4.4-nohive.jar:/home/yzruntime/.m2/repository/com/esotericsoftware/kryo-shaded/3.0.3/kryo-shaded-3.0.3.jar:/home/yzruntime/.m2/repository/com/esotericsoftware/minlog/1.3.0/minlog-1.3.0.jar:/home/yzruntime/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-column/1.8.3/parquet-column-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-common/1.8.3/parquet-common-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-encoding/1.8.3/parquet-encoding-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-hadoop/1.8.3/parquet-hadoop-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-format/2.3.1/parquet-format-2.3.1.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-jackson/1.8.3/parquet-jackson-1.8.3.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-servlet/9.3.24.v20180605/jetty-servlet-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.6.7.1/jackson-databind-2.6.7.1.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar:/home/yzruntime/.m2/repository/org/apache/arrow/arrow-vector/0.8.0/arrow-vector-0.8.0.jar:/home/yzruntime/.m2/repository/org/apache/arrow/arrow-format/0.8.0/arrow-format-0.8.0.jar:/home/yzruntime/.m2/repository/org/apache/arrow/arrow-memory/0.8.0/arrow-memory-0.8.0.jar:/home/yzruntime/.m2/repository/joda-time/joda-time/2.9.3/joda-time-2.9.3.jar:/home/yzruntime/.m2/repository/com/carrotsearch/hppc/0.7.2/hppc-0.7.2.jar:/home/yzruntime/.m2/repository/com/vlkan/flatbuffers/1.2.0-3f79e055/flatbuffers-1.2.0-3f79e055.jar:/home/yzruntime/.m2/repository/org/apache/xbean/xbean-asm5-shaded/4.4/xbean-asm5-shaded-4.4.jar:/home/yzruntime/.m2/repository/org/scalacheck/scalacheck_2.11/1.13.5/scalacheck_2.11-1.13.5.jar:/home/yzruntime/.m2/repository/org/scala-sbt/test-interface/1.0/test-interface-1.0.jar:/home/yzruntime/.m2/repository/com/h2database/h2/1.4.195/h2-1.4.195.jar:/home/yzruntime/.m2/repository/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar:/home/yzruntime/.m2/repository/org/postgresql/postgresql/9.4.1207.jre7/postgresql-9.4.1207.jre7.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-avro/1.8.3/parquet-avro-1.8.3.jar:/home/yzruntime/.m2/repository/it/unimi/dsi/fastutil/6.5.7/fastutil-6.5.7.jar:/home/yzruntime/.m2/repository/org/apache/avro/avro/1.8.1/avro-1.8.1.jar:/home/yzruntime/.m2/repository/com/thoughtworks/paranamer/paranamer/2.8/paranamer-2.8.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.jar:/home/yzruntime/.m2/repository/org/tukaani/xz/1.5/xz-1.5.jar:/home/yzruntime/.m2/repository/org/mockito/mockito-core/1.10.19/mockito-core-1.10.19.jar:/home/yzruntime/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/yzruntime/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/home/yzruntime/.m2/repository/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar:/home/yzruntime/.m2/repository/org/scalatest/scalatest_2.11/3.0.3/scalatest_2.11-3.0.3.jar:/home/yzruntime/.m2/repository/org/scalactic/scalactic_2.11/3.0.3/scalactic_2.11-3.0.3.jar:/home/yzruntime/.m2/repository/org/scala-lang/modules/scala-xml_2.11/1.0.5/scala-xml_2.11-1.0.5.jar:/home/yzruntime/.m2/repository/junit/junit/4.12/junit-4.12.jar:/home/yzruntime/.m2/repository/com/novocode/junit-interface/0.11/junit-interface-0.11.jar org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner -s org.apache.spark.sql.DataFrameSuite -testName "SPARK-20897: cached self-join should not fail" -showProgressMessages true
Testing started at 4:35 pm ...


01:35:55.103 WARN org.apache.spark.util.Utils: Your hostname, yzhost resolves to a loopback address: 127.0.1.1; using 192.168.50.125 instead (on interface wlp5s0)
01:35:55.104 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
01:35:55.262 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

== Physical Plan ==
SortMergeJoin [i#5], [i#16], LeftOuter
:- Sort [i#5 ASC NULLS FIRST], false, 0
: +- Coalesce 1
: +- LocalTableScan [i#5, j#6]
+- Sort [i#16 ASC NULLS FIRST], false, 0
+- Coalesce 1
+- LocalTableScan [i#16, j#17]
104408849 SortMergeJoinScanner init
/* 1. Right #1 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,4,44], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,4,44] ]
LeftOuter init in SparkPlan
165384710 LeftOuterIterator init
165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 1252938438
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
/* 2. Left #1 */ 104408849 advancedStreamed() streamedIter.advanceNext()=true streamedRow: [0,1,1]
165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 2135241262
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
/* 3. Left #2 */ 104408849 advancedStreamed() streamedIter.advanceNext()=true streamedRow: [0,4,4]
/* 3.1 Right #1 got joined, then it was put inside the buffer */ 104408849 bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
/* 4. Right #2 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,4,45], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,4,45] ]
* 4.1 Right #2 got joined, then it was put inside the buffer */ 104408849 bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
/* 5. Right #3 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,7,77], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,7,77] ]
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 1621560499
/* 6. Right #1 */ rightMatchesIterator 1621560499 has [0,4,44]165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
/* 7. Right #2 */ rightMatchesIterator 1621560499 has [0,4,45]165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
/* 8. Left #3 */ 104408849 advancedStreamed() streamedIter.advanceNext()=true streamedRow: [0,8,8]
/* 9. Right #4 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,10,1010], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,10,1010] ]
165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 2010570488
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
104408849 advancedStreamed() streamedIter.advanceNext()=false streamedRow: null
+---+---+----+----+
|i |j |i |j |
+---+---+----+----+
|1 |1 |null|null|
|4 |4 |4 |68 |
|4 |4 |4 |69 |
|8 |8 |null|null|
+---+---+----+----+




Process finished with exit code 0

Reference

Issue detail

After the shuffle partition changed from 800 to 3200, a non-joining, non-aggregation job, without any collect() inside Spark driver, collapsed by the following error

Config Detail
Spark Version 2.3.2
DataFrame count 77 million
DataFrame Column numbers Around 20 columns
Spark Submit mode Yarn-Cluster
Driver Memory 4 GB
Driver Memory Overhead 2 GB
Log details
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
29-03-2024 09:51:54 INFO - 2024-03-29 09:51:54 JST INFO com.xxxxx.xxxxxx.xxx.xxx.executor.ExportExecutor: inputDfPath:hdfs://xxx/user/xxxx/xxx-xxx/xxx_xxx_xxx/data/dt=xxx
29-03-2024 09:51:54 INFO - 2024-03-29 09:51:54 JST INFO com.xxxxx.xxxxxx.xxx.xxx.reader.ReaderImpl: Reading data from HDFS path: [hdfs://xxx/user/xxxx/xxx-xxx/xxx_xxx_xxx/data/dt=xxx] schema: StructType(StructField(xxxxx))
29-03-2024 09:52:34 INFO - 2024-03-29 09:52:34 JST INFO com.xxxxx.xxxxxx.xxx.xxx.writer.WriterImpl: Writing data to HDFS format: [csv] mode:[overwrite] separator:[ ] compression:[gzip] path:[hdfs://xxx/user/xxxx/xxx-xxx/xxx_xxx_xxx/data/output/] header:[false] encoding:[UTF-8]
29-03-2024 09:52:34 INFO - 2024-03-29 09:52:34 JST WARN org.apache.spark.util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
29-03-2024 09:52:34 INFO - 2024-03-29 09:52:34 JST INFO com.hadoop.compression.lzo.GPLNativeCodeLoader: Loaded native gpl library
29-03-2024 09:52:34 INFO - 2024-03-29 09:52:34 JST INFO com.hadoop.compression.lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev bb4f4d562ec4888b1c6b0dec1ed7bc4b60229496]
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST ERROR org.apache.spark.scheduler.TaskSetManager: Total size of serialized results of 219 tasks (1029.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job null.
29-03-2024 09:53:10 INFO - org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 219 tasks (1029.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
29-03-2024 09:53:10 INFO - at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
29-03-2024 09:53:10 INFO - at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
29-03-2024 09:53:10 INFO - at scala.Option.foreach(Option.scala:257)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
29-03-2024 09:53:10 INFO - at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2079)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2104)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
29-03-2024 09:53:10 INFO - at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:309)
29-03-2024 09:53:10 INFO - at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:224)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
.....
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot$.runJob(Boot.scala:55)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot$.main(Boot.scala:30)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot.main(Boot.scala)
29-03-2024 09:53:10 INFO - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
29-03-2024 09:53:10 INFO - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
29-03-2024 09:53:10 INFO - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
29-03-2024 09:53:10 INFO - at java.lang.reflect.Method.invoke(Method.java:498)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 230.0 in stage 0.0 (TID 261, xxx.xxx.hadoop.server, executor 13): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 364.0 in stage 0.0 (TID 281, xxx.xxx.hadoop.server, executor 13): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 476.0 in stage 0.0 (TID 209, xxx.xxx.hadoop.server, executor 14): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 238.0 in stage 0.0 (TID 259, xxx.xxx.hadoop.server, executor 71): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 464.0 in stage 0.0 (TID 274, xxx.xxx.hadoop.server, executor 8): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 394.0 in stage 0.0 (TID 233, xxx.xxx.hadoop.server, executor 15): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 392.0 in stage 0.0 (TID 212, xxx.xxx.hadoop.server, executor 8): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 513.0 in stage 0.0 (TID 247, xxx.xxx.hadoop.server, executor 33): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 479.0 in stage 0.0 (TID 254, xxx.xxx.hadoop.server, executor 94): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 365.0 in stage 0.0 (TID 211, xxx.xxx.hadoop.server, executor 16): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 458.0 in stage 0.0 (TID 286, xxx.xxx.hadoop.server, executor 16): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 300.0 in stage 0.0 (TID 283, xxx.xxx.hadoop.server, executor 42): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 308.0 in stage 0.0 (TID 279, xxx.xxx.hadoop.server, executor 75): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 134.0 in stage 0.0 (TID 231, xxx.xxx.hadoop.server, executor 75): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 440.0 in stage 0.0 (TID 278, xxx.xxx.hadoop.server, executor 89): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 374.0 in stage 0.0 (TID 277, xxx.xxx.hadoop.server, executor 85): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 343.0 in stage 0.0 (TID 237, xxx.xxx.hadoop.server, executor 85): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 345.0 in stage 0.0 (TID 265, xxx.xxx.hadoop.server, executor 3): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 441.0 in stage 0.0 (TID 267, xxx.xxx.hadoop.server, executor 100): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 502.0 in stage 0.0 (TID 214, xxx.xxx.hadoop.server, executor 79): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 465.0 in stage 0.0 (TID 227, xxx.xxx.hadoop.server, executor 54): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 426.0 in stage 0.0 (TID 226, xxx.xxx.hadoop.server, executor 52): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 359.0 in stage 0.0 (TID 248, xxx.xxx.hadoop.server, executor 100): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 488.0 in stage 0.0 (TID 123, xxx.xxx.hadoop.server, executor 63): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 354.0 in stage 0.0 (TID 205, xxx.xxx.hadoop.server, executor 43): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 506.0 in stage 0.0 (TID 221, xxx.xxx.hadoop.server, executor 43): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 461.0 in stage 0.0 (TID 292, xxx.xxx.hadoop.server, executor 32): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 423.0 in stage 0.0 (TID 253, xxx.xxx.hadoop.server, executor 32): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 367.0 in stage 0.0 (TID 234, xxx.xxx.hadoop.server, executor 89): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 410.0 in stage 0.0 (TID 140, xxx.xxx.hadoop.server, executor 78): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 511.0 in stage 0.0 (TID 164, xxx.xxx.hadoop.server, executor 62): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 484.0 in stage 0.0 (TID 28, xxx.xxx.hadoop.server, executor 12): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 328.0 in stage 0.0 (TID 230, xxx.xxx.hadoop.server, executor 19): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 348.0 in stage 0.0 (TID 258, xxx.xxx.hadoop.server, executor 36): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 333.0 in stage 0.0 (TID 255, xxx.xxx.hadoop.server, executor 27): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 485.0 in stage 0.0 (TID 256, xxx.xxx.hadoop.server, executor 27): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 331.0 in stage 0.0 (TID 271, xxx.xxx.hadoop.server, executor 19): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 411.0 in stage 0.0 (TID 282, xxx.xxx.hadoop.server, executor 36): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 482.0 in stage 0.0 (TID 290, xxx.xxx.hadoop.server, executor 41): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 471.0 in stage 0.0 (TID 288, xxx.xxx.hadoop.server, executor 41): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 490.0 in stage 0.0 (TID 291, xxx.xxx.hadoop.server, executor 93): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 379.0 in stage 0.0 (TID 285, xxx.xxx.hadoop.server, executor 93): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 350.0 in stage 0.0 (TID 216, xxx.xxx.hadoop.server, executor 77): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 396.0 in stage 0.0 (TID 287, xxx.xxx.hadoop.server, executor 60): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 321.0 in stage 0.0 (TID 210, xxx.xxx.hadoop.server, executor 90): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 337.0 in stage 0.0 (TID 235, xxx.xxx.hadoop.server, executor 60): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 90.0 in stage 0.0 (TID 264, xxx.xxx.hadoop.server, executor 86): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 332.0 in stage 0.0 (TID 243, xxx.xxx.hadoop.server, executor 92): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 446.0 in stage 0.0 (TID 284, xxx.xxx.hadoop.server, executor 92): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 496.0 in stage 0.0 (TID 156, xxx.xxx.hadoop.server, executor 81): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 385.0 in stage 0.0 (TID 241, xxx.xxx.hadoop.server, executor 22): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 514.0 in stage 0.0 (TID 266, xxx.xxx.hadoop.server, executor 25): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 462.0 in stage 0.0 (TID 222, xxx.xxx.hadoop.server, executor 90): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 401.0 in stage 0.0 (TID 263, xxx.xxx.hadoop.server, executor 22): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 450.0 in stage 0.0 (TID 228, xxx.xxx.hadoop.server, executor 25): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 225.0 in stage 0.0 (TID 276, xxx.xxx.hadoop.server, executor 4): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 403.0 in stage 0.0 (TID 239, xxx.xxx.hadoop.server, executor 7): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 430.0 in stage 0.0 (TID 280, xxx.xxx.hadoop.server, executor 4): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 505.0 in stage 0.0 (TID 215, xxx.xxx.hadoop.server, executor 91): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 201.0 in stage 0.0 (TID 269, xxx.xxx.hadoop.server, executor 99): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 138.0 in stage 0.0 (TID 257, xxx.xxx.hadoop.server, executor 99): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 314.0 in stage 0.0 (TID 260, xxx.xxx.hadoop.server, executor 47): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 501.0 in stage 0.0 (TID 10, xxx.xxx.hadoop.server, executor 2): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 397.0 in stage 0.0 (TID 272, xxx.xxx.hadoop.server, executor 86): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 453.0 in stage 0.0 (TID 217, xxx.xxx.hadoop.server, executor 45): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 497.0 in stage 0.0 (TID 270, xxx.xxx.hadoop.server, executor 17): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 330.0 in stage 0.0 (TID 268, xxx.xxx.hadoop.server, executor 47): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST ERROR org.apache.spark.scheduler.TaskSetManager: Total size of serialized results of 220 tasks (1036.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 368.0 in stage 0.0 (TID 236, xxx.xxx.hadoop.server, executor 9): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 503.0 in stage 0.0 (TID 262, xxx.xxx.hadoop.server, executor 24): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 378.0 in stage 0.0 (TID 289, xxx.xxx.hadoop.server, executor 9): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 475.0 in stage 0.0 (TID 95, xxx.xxx.hadoop.server, executor 83): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 500.0 in stage 0.0 (TID 293, xxx.xxx.hadoop.server, executor 77): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 481.0 in stage 0.0 (TID 249, xxx.xxx.hadoop.server, executor 64): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 499.0 in stage 0.0 (TID 273, xxx.xxx.hadoop.server, executor 83): TaskKilled (Stage cancelled)
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST ERROR com.xxxxx.xxxxxx.xxx.xxx.Boot$: boot error
29-03-2024 09:53:10 INFO - org.apache.spark.SparkException: Job aborted.
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.writer.WriterImpl.writeDataToHDFS(WriterImpl.scala:27)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.executor.ExportExecutor.execute(ExportExecutor.scala:45)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.executor.base.CommandExecutorImpl$class.execute(CommandExecutor.scala:36)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot$$anon$1.execute(Boot.scala:49)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot$.runJob(Boot.scala:55)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot$.main(Boot.scala:30)
29-03-2024 09:53:10 INFO - at com.xxxxx.xxxxxx.xxx.xxx.Boot.main(Boot.scala)
29-03-2024 09:53:10 INFO - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
29-03-2024 09:53:10 INFO - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
29-03-2024 09:53:10 INFO - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
29-03-2024 09:53:10 INFO - at java.lang.reflect.Method.invoke(Method.java:498)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
29-03-2024 09:53:10 INFO - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
29-03-2024 09:53:10 INFO - Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 219 tasks (1029.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
29-03-2024 09:53:10 INFO - at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
29-03-2024 09:53:10 INFO - at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
29-03-2024 09:53:10 INFO - at scala.Option.foreach(Option.scala:257)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
29-03-2024 09:53:10 INFO - at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
29-03-2024 09:53:10 INFO - at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2079)
29-03-2024 09:53:10 INFO - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2104)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
29-03-2024 09:53:10 INFO - at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:309)
29-03-2024 09:53:10 INFO - at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:224)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
29-03-2024 09:53:10 INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
29-03-2024 09:53:10 INFO - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
29-03-2024 09:53:10 INFO - ... 36 more
29-03-2024 09:53:10 INFO - 2024-03-29 09:53:10 JST INFO com.xxxxx.xxxxxx.xxx.xxx.Boot$: going to stop the spark
29-03-2024 09:53:17 INFO - 2024-03-29 09:53:17 JST INFO com.xxxxx.xxxxxx.xxx.xxx.Boot$: going to exit from main existStatus:1
29-03-2024 09:53:18 INFO - Process with id 53171 completed unsuccessfully in 105 seconds.
29-03-2024 09:53:18 ERROR - Job run failed!

Spark code

1
2
3
4
5
6
7
val inputDf = read(schema=???, path=???)
val outputDf = inputDf
.filter(???)
.transform(applySchema(???.schema))
.orderBy(id)

writer.writeOutput(outputDf, ???)

Root Cause

According to Spark plan, it will raise a ranged partitioning at the ending when there is orderBy before writing out.

1
2
3
4
5
6
7
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://xxx/user/xxxx/xxx-xxx/xxx_xxx_xxx/data/output/, false, CSV, ...
+- *(2) Sort [offer_id#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#0 ASC NULLS FIRST, 3200)
+- *(1) Project [id#0,...
+- *(1) Filter (...)
+- *(1) FileScan csv [id...

Check the ranged partition source code, there was a collect() inside the sourcecode. The collect() could be the main reason that cause the Spark driver failure.

Trial

  1. Try to force a hash partition, not work, it will generate a hash partitioning before range partitioning
    1
    2
    3
    4
    5
    6
    7
    8
    == Physical Plan ==
    Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://xxx/user/xxxx/xxx-xxx/xxx_xxx_xxx/data/output/, false, CSV, ...
    +- *(2) Sort [offer_id#0 ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(id#0 ASC NULLS FIRST, 3200)
    +- Exchange hashpartitioning(id#0, 3200)
    +- *(1) Project [id#0,...
    +- *(1) Filter (...)
    +- *(1) FileScan csv [id...
  2. Try removing the spark.driver.maxResultSize by setting spark.driver.maxResultSize=0, got Spark Driver side OOM issue
  3. Try remove orderBy and force a hash partition, it works
    1
    2
    3
    4
    5
    6
    7
    val inputDf = read(schema=???, path=???)
    val outputDf = inputDf
    .filter(???)
    .transform(applySchema(???.schema))
    .repartitionBy(id) // <<< Force it repartition

    writer.writeOutput(outputDf, ???)

In Java 18, the array expansion code has huge change compared to Java 8. For Java 18 ArrayList and PriorityQueue, their array expansion calls ArraysSupport.newLength().

ArrayList

We only discuss the scenario of appending 1 element.

Let’s take a look of what inside the ArrayList.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// This is ArrayList's the simplest adding function, which aims to append the element
public boolean add(E e) {
// 1. The default user interface modifies modCount directly.
// Java 8 did it in ensureCapacityInternal().
modCount++;
// 2. elementData is the internal Object[] array that wrapped and maintained by ArrayList
add(e, elementData, size);
return true;
}

private void add(E e, Object[] elementData, int s) {
// 3. s == internal array's length means that the internal array full
if (s == elementData.length)
// 4. Internal array will be changed to a new array with bigger capacity
elementData = grow();
elementData[s] = e;
size = s + 1;
}

The main responsibility of grow() is to expand the size of internal array elementData.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// When the outside wants to append 1 element, the grow() helper method will set the minCapacity = 1
private Object[] grow() {
return grow(size + 1);
}

private Object[] grow(int minCapacity) {
int oldCapacity = elementData.length;
// 4. Test if the internal array is empty array
if (oldCapacity > 0 || elementData != DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
// 4.1 If not empty array, calculate a new capacity using ArraysSupport.newLength()
int newCapacity = ArraysSupport.newLength(oldCapacity,
minCapacity - oldCapacity, /* minimum growth */
oldCapacity >> 1 /* preferred growth */);
return elementData = Arrays.copyOf(elementData, newCapacity);
} else {
// 4.2 If internal array is empty, in our case grow(size + 1), it will be given a DEFAULT_CAPACITY = 10
return elementData = new Object[Math.max(DEFAULT_CAPACITY, minCapacity)];
}
}

Compared to Java 8, Java 18 removed ensureCapacityInternal(), ensureExplicitCapacity() and let grow() to undertake more responsibility. Previously ensureCapacityInternal() is responsible for testing elementData != DEFAULTCAPACITY_EMPTY_ELEMENTDATA while ensureExplicitCapacity() is responsible for some task of ArraysSupport.newLength().

Let’s take a look of what inside ArraysSupport.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// A soft maximum array length imposed by array growth computations.
// Some JVMs (such as HotSpot) have an implementation limit that will cause OutOfMemoryError("Requested array size exceeds VM limit")
public static final int SOFT_MAX_ARRAY_LENGTH = Integer.MAX_VALUE - 8;

// 5. ArraysSupport.newLength() is responsible for calculate a new length.
public static int newLength(int oldLength, int minGrowth, int prefGrowth) {
// 5.1 From our case, minGrowth = minCapacity - oldCapacity = size + 1 - size = 1
// max(minGrowth, prefGrowth) = max(1, 1/2 * size) = size / 2
// So the prefLength = 3/2 * size
int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
// 5.2 If 3/2 * size < SOFT_MAX => size < 2/3 * SOFT_MAX, return the prefLength
if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
return prefLength;
} else {
return hugeLength(oldLength, minGrowth);
}
}

// In our general appending case, minGrowth = 1
private static int hugeLength(int oldLength, int minGrowth) {
int minLength = oldLength + minGrowth;
if (minLength < 0) { // overflow
throw new OutOfMemoryError(
"Required array length " + oldLength + " + " + minGrowth + " is too large");
// 5.3 If size + 1 <= SOFT_MAX => size <= SOFT_MAX - 1, return SOFT_MAX
} else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
return SOFT_MAX_ARRAY_LENGTH;
} else {
// 5.4 If size > SOFT_MAX - 1, return 1
return minLength;
}
}

Java 18 considered the limitation of special Hotspot JVM by setting a SOFT_MAX to Integer.MAX_VALUE - 8 rather than directly expand the internal array to Integer.MAX_VALUE.

The whole internal array size growing during appending element one by one is clear:

  1. When 1 < size < 2/3 * SOFT_MAX, the elementData will add 1/2 size
  2. When 2/3 * SOFT_MAX < size <= SOFT_MAX - 1, adding 1/2 * SOFT_MAX may cause the whole elementData.length > Integer.MAX_VALUE. So it extends elementData to SOFT_MAX directly.
  3. When SOFT_MAX - 1 < size, just keep growing 1.
    • In most common case, we don’t store the Integer.MAX_VALUE amount of elements into one ArrayList
    • This growing 1 operation may aim to delay the application crash.

PriorityQueue

1
2
3
4
5
6
7
8
9
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = ArraysSupport.newLength(oldCapacity,
minCapacity - oldCapacity, /* minimum growth */
oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1
/* preferred growth */);
queue = Arrays.copyOf(queue, newCapacity);
}

The only difference of grow() strategy between PriorityQueue and ArrayList is the preferred growth. ArrayList prefers expand half of its current capacity while PriorityQueue prefers only grow 2 when less than 64 and grow half when big than 64.

Summary

https://docs.google.com/spreadsheets/d/1RrJplFDo4eUL97iUsypflmbEnVFG2VhLAe-VERYTwzg/edit?usp=sharing

Reference

1 HashMap

Short description Link
Time Complexity of HashMap methods https://stackoverflow.com/questions/4577998/time-complexity-of-hashmap-methods
Time Complexity of HashMap get() https://skyfly.xyz/2020/02/25/Java/HashMapTimeComplexity/
Understanding HashMap resize() https://segmentfault.com/a/1190000015812438
The main document that I learn HashMap https://blog.csdn.net/v123411739/article/details/78996181
Another document contains some interview questions https://blog.csdn.net/v123411739/article/details/115364158
A document with illustration (need to login CSDN) https://blog.csdn.net/yinwenjie/article/details/102531402

Questions:

  • Didn’t clearly learn the remove() method.
  • For the tableSizeFor(), it is changed after java 8. But I don’t know how to locate the version of a code change.
  • After the sharing
    • The actual binary calculation progress of the capability of Map. For example capability is 32, so binary is what?
      • For example, 111 is 7. But 7 cannot be the capacity.
    • What is amortized time complexity?

Problems solved:

  • The difference between (n - 1) & hash and (e.hash & oldCap) == 0 by reading Understanding HashMap resize().
  • The time complexity for put() get() delete() is from O(1) to O(k), the key can be a bin’s Linked List or the height of the red black tree.

2 ArrayList

Short description Link
A main document that I’ve studied https://blog.csdn.net/zxt0601/article/details/77281231
Another ArrayList docs that I didn’t read too much https://blog.51cto.com/u_13604316/2673349

Questions:

  • Will System.arrayCopy got overlap? In linux, there are 2 way to copy the memory, one of them can cause overlap and data loss.
  • Will ArrayList shrink?
  • What is System.arrayCopy’s space complexity?

Problem solved:

  • How ArrayList insert an value?
    • grow() -> System.arrayCopy the second half -> Insert the target value to the middle empty slot

3 LinkedList

Short description Link
A main document that I’ve studied https://blog.csdn.net/zxt0601/article/details/77341098

Questions:

  • Why LinkedList didn’t implement the isEmpty() method directly?

Problem solved:

  • How LinkedList treat the value that is null?
    • LinkedList can find the index based on the keyObject.equals(). For example, LinkList.indexOf(Object o). When we input indexOf(null), it need to avoid calling null.equals(). And use == instead.
    • LinkedList.contains() also calls indexOf(Object o)

4 PriorityQueue

Short description Link
PriorityQueue source code analysis doc with a little problem https://blog.csdn.net/codejas/article/details/85144502

This PriorityQueue source code analysis doc with a little problem has a problem of saying that PriorityQueue is min-heap. But actually, the PriorityQueue is max heap.

Pain points

When writing Spark application, we can set the alias for DataFrame while we cannot get a DataFrame’s alias. This is because when we use df.alias(), Spark will create a org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias, which is a logical plan node. This means that the alias may not be a field inside the DataFrame, it may be just a logical plan to be executed in the runtime.

If in one function we set an alias for a DataFrame, then we want to use the alias in another function, we need to declare a global fields to let us access the same alias in different functions. This will cause code smell.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def painPoint(): Unit = {
val theAlias = "an_alias"

def function1(df: DataFrame): DataFrame = {
df.alias(theAlias)
}

def function2(df: DataFrame): DataFrame = {
df.join(getOsAndPrice.alias("osAndPrice"), Seq("id"), "left_outer")
.drop(col(s"$theAlias.os")) // Here we should keep 'theAlias' field
// drop(s"$theAlias.os") doesn't works, will see the root cause later
}

function2(function1(getOsAndSize)).show(1000, false)
}

Resolve

If we have a global manager to help us manager the alias, the code may change to this.

1
2
3
4
5
6
7
8
9
10
11
12
def painPointResolve(): Unit = {
def function1(df: DataFrame): DataFrame = {
df.transform(setAlias("an_alias"))
}

def function2(df: DataFrame): DataFrame = {
df.join(getOsAndPrice.alias("osAndPrice"), Seq("id"), "left_outer")
.drop(col(s"${getAlias(df)}.os"))
}

function2(function1(getOsAndSize)).show(1000, false)
}

So we can use 2 Map to implement this manager. To prevent the parallel issue, we use ConcurrentHashMap as a place to store the mapping relationship, and we use synchronized() to make the get or set operator an atomic operator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package net.i18ndev.spark.alias

import org.apache.spark.sql.DataFrame

import java.util.concurrent.ConcurrentHashMap

object DfAliasManager {

lazy private val toAlias: ConcurrentHashMap[DataFrame, String] = new ConcurrentHashMap[DataFrame, String]()
lazy private val toDataFrame: ConcurrentHashMap[String, DataFrame] = new ConcurrentHashMap[String, DataFrame]()

def setAlias(alias: String): DataFrame => DataFrame = {
df =>
this.synchronized{
val aliasedDf = df.alias(alias)
toAlias.put(aliasedDf, alias)
toDataFrame.put(alias, aliasedDf)
aliasedDf
}
}

def getAlias(df: DataFrame): String = {
this.synchronized{
toAlias.get(df)
}
}

def getDataFrame(alias: String): DataFrame = {
this.synchronized{
toDataFrame.get(alias)
}
}

}

Reference

The code in this article: https://github.com/Solodye/spark-demo/blob/main/src/main/scala/net/i18ndev/spark/alias/DfAliasManagerDemo.scala

When develop table processing Spark applications, we may encounter this issue.

1
2
3
4
5
6
7
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'os' is ambiguous, could be: default.year_table.os, os, osAndSize.os.;
at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$42.apply(Analyzer.scala:1001)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$42.apply(Analyzer.scala:1003)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
...

The root cause of this issue is our DataFrame have two columns with same column name. It may be caused by our inappropriate join. To handle this issue, we should remove the redundant columns before we join to make sure there are no columns with same column name, or use alias to remove it after join.

We can use easier code to reproduce this problem.

1. Reproduce

Imagine company’s business is to sell the OS. We already get the basic information from several data sources.

First, we have the most important table year_table, which is ingested from MySQL to Hive.

1
2
3
4
5
6
7
8
+---+-------+----+-------+-----+
|id |os |year|os |price|
+---+-------+----+-------+-----+
|1 |ubuntu |2005|ubuntu |50 |
|2 |xiaomi |2010|xiaomi |100 |
|3 |suse |2015|suse |150 |
|4 |samsung|2020|samsung|200 |
+---+-------+----+-------+-----+

From the data we can see the OS ubuntu we have was launched in 2015. And we also have 3 other OS. When we read this data, we should read it from Hive. So we simulate this year_table like this.

1
2
3
4
5
6
7
8
9
10
11
12
13
def getOsAndYear: DataFrame = {
val columns = Seq("id", "os", "year")
val data = Seq(
(1,"ubuntu", 2005),
(2, "xiaomi", 2010),
(3, "suse", 2015),
(4, "samsung", 2020)
)
val osAndYear = spark.createDataFrame(data).toDF(columns: _*)
val schema = "default"
osAndYear.write.mode("overwrite").saveAsTable(s"$schema.year_table")
spark.read.table(s"$schema.year_table")
}

Then we have price_table and size_table, which records the price and the space that the OS occupied. These 2 tables are come from some human static data, which is not stable. Some name of the OS is null, but ID still can be used.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def getOsAndPrice: DataFrame = {
val columns = Seq("id", "os", "price")
val data = Seq(
(1,"ubuntu", 50),
(2, "xiaomi", 100),
(3, null, 150),
(4, "samsung", 200)
)
spark.createDataFrame(data).toDF(columns: _*)
}

def getOsAndSize: DataFrame = {
val columns = Seq("id", "os", "size")
val data = Seq(
(1,"ubuntu", 3444),
(2, null, 4546),
(3, "suse", 5747),
(4, "samsung", 687687)
)
spark.createDataFrame(data).toDF(columns: _*)
}

We should join it by name as our name value is not stable.

1
2
3
4
5
val yearAndPrice = getOsAndYear.join(broadcast(getOsAndPrice), Seq("id"), "left_outer")
val osAndSize = getOsAndSize.alias("osAndSize")
val yearPriceAndSize = yearAndPrice.join(osAndSize,
yearAndPrice("id") <=> col("osAndSize.id"), "left_outer")
yearPriceAndSize.show(1000, false)
1
2
3
4
5
6
7
8
+---+-------+----+-------+-----+---+-------+------+
|id |os |year|os |price|id |os |size |
+---+-------+----+-------+-----+---+-------+------+
|1 |ubuntu |2005|ubuntu |50 |1 |ubuntu |3444 |
|2 |xiaomi |2010|xiaomi |100 |2 |null |4546 |
|3 |suse |2015|null |150 |3 |suse |5747 |
|4 |samsung|2020|samsung|200 |4 |samsung|687687|
+---+-------+----+-------+-----+---+-------+------+

The weird things happened. Our table has 3 identical os columns. If we try to select this os column, the exception would be reproduced.

1
yearPriceAndSize.select("os") 
1
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'os' is ambiguous, could be: default.year_table.os, os, osAndSize.os.;

2. Solution

2.1 Drop redundant columns before join

1
2
3
4
5
6
val yearAndPrice = getOsAndYear
.join(broadcast(getOsAndPrice.drop("os")/*drop os*/), Seq("id"), "left_outer")
val osAndSize = getOsAndSize.alias("osAndSize")
val yearPriceAndSize = yearAndPrice.join(osAndSize.drop("os"),// drop os
yearAndPrice("id") <=> col("osAndSize.id"), "left_outer")
yearPriceAndSize.select("os").show(100, false)
1
2
3
4
5
6
7
8
+-------+
|os |
+-------+
|ubuntu |
|xiaomi |
|suse |
|samsung|
+-------+

Of course this solution is so cumbersome due to we should drop the redundant columns again and again. As far as we know, sometimes we only know some important columns in some tables in our live environment rather than all the columns. If everytime we remove them, we would repeat ourselves again and again.

2.2 Use a relative way to keep left redundant columns or right redundant columns

We can use Set to find the columns we want to keep or remove. Usually our join has a main DataFrame, which is a DataFrame that we only want to add some necessary columns after join. For example, the year_table is the main DataFrame in the 1st join due to we want to add the price column to the year_table rather than add the year to others.

After the joining, we will apply a select, in which we only keeps the columns that comes from a main DataFrame and the columns that the main DataFrame don’t have. This is what clearRedundant will do.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def dropRedundantColumnsAfterJoin(): Unit = {
val osAndPrice = broadcast(getOsAndPrice).alias("osAndPrice")
val osAndYear = getOsAndYear.alias("osAndYear")
val osAndSize = getOsAndSize.alias("osAndSize")
val yearAndPrice = osAndYear
.join(osAndPrice, Seq("id"), "left_outer")
.transform(clearRedundant(osAndYear, "osAndYear", osAndPrice, "osAndPrice"))
.alias("yearAndPrice")
val yearPriceAndSize =
osAndSize.join(yearAndPrice, Seq("id"), "right_outer")
.transform(clearRedundant(yearAndPrice, "yearAndPrice", osAndSize, "osAndSize"))
yearPriceAndSize.show(200, false)
}

def clearRedundant(mainDf: DataFrame, mainDfAlias: String,
appendDf: DataFrame, appendDfAlias: String): DataFrame => DataFrame = {
val mainCols = mainDf.columns.toSet
val appendDfCols = appendDf.columns.toSet
// Usually our join is to append some columns to a main DataFrame
// So the columns to be append is other DataFrame's columns - the columns we already have in main DataFrame
val appendCols: Set[String] = appendDfCols -- mainCols
val colsAfterJoin: Set[Column] = {
// here is to remove the column alias to make sure the new joined df have no redundant alias
mainCols.map(c => col(s"$mainDfAlias.$c").as(c)) union
appendCols.map(c => col(s"$appendDfAlias.$c").as(c))
}
finalDf => finalDf.select(colsAfterJoin.toList: _*)
}
1
2
3
4
5
6
7
8
+-------+----+---+-----+------+
|os |year|id |price|size |
+-------+----+---+-----+------+
|ubuntu |2005|1 |50 |3444 |
|xiaomi |2010|2 |100 |4546 |
|suse |2015|3 |150 |5747 |
|samsung|2020|4 |200 |687687|
+-------+----+---+-----+------+

Reference

The code in this article: https://github.com/Solodye/spark-demo/blob/main/src/main/scala/net/i18ndev/spark/alias/ReferenceAmbiguousReproduce.scala

How to mock Hive in Spark app: https://stackoverflow.com/questions/55068459/setup-spark-for-ci-how-to-mock-hive-tables

Background

When we use groupBy, it generates a single value from every group.

If we not only want to operate on a group of rows but also want our function return a single value for every input row, we should use window function.

Example of last_activity_time

Business requirement

We have a user table with 3 columns. When user click our app, our tracking application will save user’s tracking record. For example: user Tom used its Linux device visited our website at 1st day, then uses ios device visited on 2nd and 3rd days.

user_name active_timestamp os
Tom 1 linux
Tom 2 ios
Tom 3 ios
Speike 3 windows
Speike 4 ios
Speike 5 ios
Jerry 6 android
Jerry 7 ios
Jerry 8 windows
Jerry 9 linux
Jerry 10 macbook

Our requirement is to calculate the last active timestamp of Tom, Jerry and Speike.

Implementation

Prepare for data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def prepareExampleData: DataFrame = {
val columns = Seq("user_name", "activity_time", "os")
val data = Seq(
("Tom", 1, "linux"),
("Tom", 2, "ios"),
("Tom", 3, "ios"),
("Speike", 3, "windows"),
("Speike", 4, "ios"),
("Speike", 5, "ios"),
("Jerry", 6, "android"),
("Jerry", 7, "ios"),
("Jerry", 8, "windows"),
("Jerry", 9, "linux"),
("Jerry", 10, "macbook")
)
spark.createDataFrame(data).toDF(columns: _*)
}

Use Spark SQL

1
2
3
4
5
6
7
8
9
10
def calculateLastActivitySql: DataFrame => DataFrame = {
df =>
df.createTempView("user_table")
df.sqlContext.sql(
"""
|select *,
|first(activity_time) over(partition by user_name order by activity_time desc) as last_activity_time
|from user_table""".stripMargin
)
}

Use Spark DataFrame API

1
2
3
4
5
def calculateLastActivity: DataFrame => DataFrame = {
df =>
val window = Window.partitionBy("user_name").orderBy(col("activity_time").desc)
df.withColumn("last_activity_time", first("activity_time").over(window))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+---------+-------------+-------+------------------+
|user_name|activity_time| os|last_activity_time|
+---------+-------------+-------+------------------+
| Tom| 3| ios| 3|
| Tom| 2| ios| 3|
| Tom| 1| linux| 3|
| Jerry| 10|macbook| 10|
| Jerry| 9| linux| 10|
| Jerry| 8|windows| 10|
| Jerry| 7| ios| 10|
| Jerry| 6|android| 10|
| Speike| 5| ios| 5|
| Speike| 4| ios| 5|
| Speike| 3|windows| 5|
+---------+-------------+-------+------------------+

Compare with groupBy

1
2
3
4
5
6
7
8
def compareWithGroupby: DataFrame => DataFrame = {
df =>
val result = df.groupBy("user_name").agg(
max("activity_time").as("last_activity_time")
)
df.join(result, usingColumn = "user_name")
.select("user_name", "activity_time", "os", "last_activity_time")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
== Physical Plan ==
*(3) Project [user_name#6, activity_time#7, os#8, last_activity_time#16]
+- *(3) BroadcastHashJoin [user_name#6], [user_name#19], Inner, BuildRight
:- LocalTableScan [user_name#6, activity_time#7, os#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *(2) HashAggregate(keys=[user_name#19], functions=[max(activity_time#20)])
+- Exchange hashpartitioning(user_name#19, 200)
+- *(1) HashAggregate(keys=[user_name#19], functions=[partial_max(activity_time#20)])
+- LocalTableScan [user_name#19, activity_time#20]
== Physical Plan ==
Window [first(activity_time#7, false) windowspecdefinition(user_name#6, activity_time#7 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_activity_time#34], [user_name#6], [activity_time#7 DESC NULLS LAST]
+- *(1) Sort [user_name#6 ASC NULLS FIRST, activity_time#7 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(user_name#6, 200)
+- LocalTableScan [user_name#6, activity_time#7, os#8]

Conditioned window implementation

Now we have another requirement: calculate the last_ios_active_time.

last_ios_active_time represents the last time that user uses its ios platform.

For example, Jerry uses ios to login our website at 7th day while he uses Windows device on 8th day. So Jerry’s last_ios_activity_time is 7.

user_name active_timestamp os last_ios_active_time
Tom 1 linux
Tom 2 ios
Tom 3 ios 3
Speike 3 windows
Speike 4 ios
Speike 5 ios 5
Jerry 6 android
Jerry 7 ios 7
Jerry 8 windows
Jerry 9 linux
Jerry 10 macbook

If we want to calculate this metric, we should have a window with a condition: only consider the record that os = “ios”.

To implement this, we can duplicate a temporary column for window.

user_name active_timestamp os _tmp_window_order_timestamp
Tom 1 linux null
Tom 2 ios 2
Tom 3 ios 3
Speike 3 windows null
Speike 4 ios 4
Speike 5 ios 5
Jerry 6 android null
Jerry 7 ios 7
Jerry 8 windows null
Jerry 9 linux null
Jerry 10 macbook null

And then set a window order by this temporary timestamp.

1
2
3
4
5
6
7
8
9
10
def calcLastIosActivityTime(tmp_col_name: String = "_tmp_window_order_timestamp")
: DataFrame => DataFrame = {
df => {
val conditionedDf = df.withColumn(tmp_col_name,
when(col("os") === "ios", col("activity_time")).otherwise(null))
val conditionedWindow =
Window.partitionBy("user_name").orderBy(col(tmp_col_name).desc)
conditionedDf.withColumn("last_ios_active_time", first(tmp_col_name).over(conditionedWindow))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+---------+-------------+-------+---------------------------+--------------------+
|user_name|activity_time| os|_tmp_window_order_timestamp|last_ios_active_time|
+---------+-------------+-------+---------------------------+--------------------+
| Tom| 3| ios| 3| 3|
| Tom| 2| ios| 2| 3|
| Tom| 1| linux| null| 3|
| Jerry| 7| ios| 7| 7|
| Jerry| 6|android| null| 7|
| Jerry| 8|windows| null| 7|
| Jerry| 9| linux| null| 7|
| Jerry| 10|macbook| null| 7|
| Speike| 5| ios| 5| 5|
| Speike| 4| ios| 4| 5|
| Speike| 3|windows| null| 5|
+---------+-------------+-------+---------------------------+--------------------+

Boundary

Sometimes we may add some redundant orderBy command. It supports to be no effect. But actually, in some cases, redundant orderBy may lead the unexpected problem.

Our business requirement is to calculate last_ios_activity_time. We have two methods to calculate this metric.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private def showWindowBoundary = {
val df = prepareExampleData
val tmp_col_name: String = "_tmp_window_order_timestamp"
val conditionedDf = df.withColumn(tmp_col_name,
when(col("os") === "ios", col("activity_time")).otherwise(null)).cache()

// Method 1
println("Method 1")
val boundaryWindow =
Window.partitionBy("user_name").orderBy(col("activity_time").desc)
conditionedDf.withColumn("last_ios_active_time", max(tmp_col_name).over(boundaryWindow)).show()

// Method 2
println("Method 2")
val userWindow =
Window.partitionBy("user_name")
conditionedDf.withColumn("last_ios_active_time", max(tmp_col_name).over(userWindow)).show()
}

Method 1 has a redundant orderBy(col("activity_time").desc) since we finally use max to calculate the metric. While Method 2 doesn’t have any orderBy.

It seems that the redundant orderBy won’t affect anything. But finally we find it will lead to some record in partition cannot be covered by the Window calculation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Method 1
+---------+-------------+-------+---------------------------+--------------------+
|user_name|activity_time| os|_tmp_window_order_timestamp|last_ios_active_time|
+---------+-------------+-------+---------------------------+--------------------+
| Tom| 3| ios| 3| 3|
| Tom| 2| ios| 2| 3|
| Tom| 1| linux| null| 3|
| Jerry| 10|macbook| null| null|
| Jerry| 9| linux| null| null|
| Jerry| 8|windows| null| null|
| Jerry| 7| ios| 7| 7|
| Jerry| 6|android| null| 7|
| Speike| 5| ios| 5| 5|
| Speike| 4| ios| 4| 5|
| Speike| 3|windows| null| 5|
+---------+-------------+-------+---------------------------+--------------------+

From the data we can see that Jerry uses his ios on 7th day, which is the latest activity_time.

If we order all Jerry’s tracking data by activity time descending, there is 3 records before the record that user_name=Jerry and active_timestamp=7. This means if our Window’s spec is from the current row to the first row, the record with activity_time=8 to 10 will not be covered.

Method 2 calculated normal value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Method 2
+---------+-------------+-------+---------------------------+--------------------+
|user_name|activity_time| os|_tmp_window_order_timestamp|last_ios_active_time|
+---------+-------------+-------+---------------------------+--------------------+
| Tom| 1| linux| null| 3|
| Tom| 2| ios| 2| 3|
| Tom| 3| ios| 3| 3|
| Jerry| 6|android| null| 7|
| Jerry| 7| ios| 7| 7|
| Jerry| 8|windows| null| 7|
| Jerry| 9| linux| null| 7|
| Jerry| 10|macbook| null| 7|
| Speike| 3|windows| null| 5|
| Speike| 4| ios| 4| 5|
| Speike| 5| ios| 5| 5|
+---------+-------------+-------+---------------------------+--------------------+

Why null still occurs

We can see the logical plan. When we use orderBy, the default spec is unboundedpreceding to current row.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conditionedDf.withColumn("last_ios_active_time", max(tmp_col_name).over(boundaryWindow)).explain(true)

== Parsed Logical Plan ==
'Project [user_name#6, activity_time#7, os#8, _tmp_window_order_timestamp#22, max('_tmp_window_order_timestamp) windowspecdefinition('user_name, 'activity_time DESC NULLS LAST, unspecifiedframe$()) AS last_ios_active_time#291]
+- Project [user_name#6, activity_time#7, os#8, CASE WHEN (os#8 = ios) THEN activity_time#7 ELSE cast(null as int) END AS _tmp_window_order_timestamp#22]
+- Project [_1#0 AS user_name#6, _2#1 AS activity_time#7, _3#2 AS os#8]
+- LocalRelation [_1#0, _2#1, _3#2]

== Analyzed Logical Plan ==
user_name: string, activity_time: int, os: string, _tmp_window_order_timestamp: int, last_ios_active_time: int
Project [user_name#6, activity_time#7, os#8, _tmp_window_order_timestamp#22, last_ios_active_time#291]
+- Project [user_name#6, activity_time#7, os#8, _tmp_window_order_timestamp#22, last_ios_active_time#291, last_ios_active_time#291]
+- Window [max(_tmp_window_order_timestamp#22) windowspecdefinition(user_name#6, activity_time#7 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_ios_active_time#291], [user_name#6], [activity_time#7 DESC NULLS LAST]
+- Project [user_name#6, activity_time#7, os#8, _tmp_window_order_timestamp#22]
+- Project [user_name#6, activity_time#7, os#8, CASE WHEN (os#8 = ios) THEN activity_time#7 ELSE cast(null as int) END AS _tmp_window_order_timestamp#22]
+- Project [_1#0 AS user_name#6, _2#1 AS activity_time#7, _3#2 AS os#8]
+- LocalRelation [_1#0, _2#1, _3#2]

When we don’t use orderBy, the default spec is unboundedproceding, unboundedfollowing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
conditionedDf.withColumn("last_ios_active_time", max(tmp_col_name).over(userWindow)).explain

== Parsed Logical Plan ==
'Project [user_name#865, activity_time#866, os#867, _tmp_window_order_timestamp#871, max('_tmp_window_order_timestamp) windowspecdefinition('user_name, unspecifiedframe$()) AS last_ios_active_time#1383]
+- Project [user_name#865, activity_time#866, os#867, CASE WHEN (os#867 = ios) THEN activity_time#866 ELSE cast(null as int) END AS _tmp_window_order_timestamp#871]
+- Project [_1#859 AS user_name#865, _2#860 AS activity_time#866, _3#861 AS os#867]
+- LocalRelation [_1#859, _2#860, _3#861]

== Analyzed Logical Plan ==
user_name: string, activity_time: int, os: string, _tmp_window_order_timestamp: int, last_ios_active_time: int
Project [user_name#865, activity_time#866, os#867, _tmp_window_order_timestamp#871, last_ios_active_time#1383]
+- Project [user_name#865, activity_time#866, os#867, _tmp_window_order_timestamp#871, last_ios_active_time#1383, last_ios_active_time#1383]
+- Window [max(_tmp_window_order_timestamp#871) windowspecdefinition(user_name#865, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_ios_active_time#1383], [user_name#865]
+- Project [user_name#865, activity_time#866, os#867, _tmp_window_order_timestamp#871]
+- Project [user_name#865, activity_time#866, os#867, CASE WHEN (os#867 = ios) THEN activity_time#866 ELSE cast(null as int) END AS _tmp_window_order_timestamp#871]
+- Project [_1#859 AS user_name#865, _2#860 AS activity_time#866, _3#861 AS os#867]
+- LocalRelation [_1#859, _2#860, _3#861]

For the record with activity_time=10, its window calculation result is null.

In the second record, its window spec is from current line to the first line.

When it comes to the record with activity_time=7, the window spec includes _tmp_window_order_timestamp=7. So the max() calculation result is 7.

When it comes to the record with activity=6, its windos spec alos includes 7.

Reference docs

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

https://stackoverflow.com/questions/40048919/what-is-the-difference-between-rowsbetween-and-rangebetween

User table

Boundary

0%