[HADOOP] 반면에 withColumn 스파크 Dataframe의 스칼라를 사용하는 방법
HADOOP반면에 withColumn 스파크 Dataframe의 스칼라를 사용하는 방법
이 제 기능 규칙 안부의 mdp_codcat, mdp_idregl를 적용하고, 어레이 BREF의 데이터에있어서의 usedRef changechanges.
def withMdpCodcat(bRef: Broadcast[Array[RefRglSDC]])(dataFrame: DataFrame):DataFrame ={var matchRule = false
var i = 0
while (i < bRef.value.size && !matchRule) {
if ((bRef.value(i).sensop.isEmpty || bRef.value(i).sensop.equals(col("signe")))
&& (bRef.value(i).cdopcz.isEmpty || Lib.matchCdopcz(strTail(col("cdopcz")).toString(), bRef.value(i).cdopcz))
&& (bRef.value(i).libope.isEmpty || Lib.matchRule(col("lib_ope").toString(), bRef.value(i).libope))
&& (bRef.value(i).qualib.isEmpty || Lib.matchRule(col("qualif_lib_ope").toString(), bRef.value(i).qualib))) {
matchRule = true
dataFrame.withColumn("mdp_codcat", lit(bRef.value(i).codcat))
dataFrame.withColumn("mdp_idregl", lit(bRef.value(i).idregl))
dataFrame.withColumn("usedRef", lit("SDC"))
}else{
dataFrame.withColumn("mdp_codcat", lit("NOT_CATEGORIZED"))
dataFrame.withColumn("mdp_idregl", lit("-1"))
dataFrame.withColumn("usedRef", lit(""))
}
i += 1
}
dataFrame
}
dataFrame : "cdenjp", "cdguic", "numcpt", "mdp_codcat", "mdp_idregl"mdp_codcat ","mdp_idregl는 ","usedRef은 "경기는 mdp_idregl, mdp_idregl, 값 BREF와 mdp_idregl을 추가하는 경우
Exemple : 내 dataframe :
val DF = Seq(("tt", "aa","bb"),("tt1", "aa1","bb2"),("tt1", "aa1","bb2")).toDF("t","a","b)
+---+---+---+---+
| t| a| b| c|
+---+---+---+---+
| tt| aa| bb| cc|
|tt1|aa1|bb2|cc3|
+---+---+---+---+
file.text 내용 :
,aa,bb,cc
,aa1,bb2,cc3
tt4,aa4,bb4,cc4
tt1,aa1,,cc6
case class TOTO(a: String, b:String, c: String, d:String)
val text = sc.textFile("file:///home/X176616/file")
val bRef= textFromCsv.map(row => row.split(",", -1))
.map(c => TOTO(c(0), c(1), c(2), c(3))).collect().sortBy(_.a)
def withMdpCodcat(bRef: Broadcast[Array[RefRglSDC]])(dataFrame: DataFrame):DataFrame
dataframe.withColumn("mdp_codcat_new", "NOT_FOUND") //first init not found, change if while if match
var matchRule = false
var i = 0
while (i < bRef.value.size && !matchRule) {
if ((bRef.value(i).a.isEmpty || bRef.value(i).a.equals(signe))
&& (bRef.value(i).b.isEmpty || Lib.matchCdopcz(col(b), bRef.value(i).b))
&& (bRef.value(i).c.isEmpty || Lib.matchRule(col(c), bRef.value(i).c))
)) {
matchRule = true
dataframe.withColumn("mdp_codcat_new", bRef.value(i).d)
dataframe.withColumn("mdp_mdp_idregl_new" = bRef.value(i).e
}
i += 1
}
마침내 진정한 조건의 경우 DF
bRef.value(i).a.isEmpty || bRef.value(i).a.equals(signe))
&& (bRef.value(i).b.isEmpty || Lib.matchCdopcz(b.substring(1).toInt.toString, bRef.value(i).b))
&& (bRef.value(i).c.isEmpty || Lib.matchRule(c, bRef.value(i).c)
+---+---+---+---+-----------+----------+
| t| a| b| c|mdp_codcat |mdp_idregl|
+---+---+---+---+-----------|----------+
| tt| aa| bb| cc|cc | other |
| ab|aa1|bb2|cc3|cc4 | toto | from bRef if true in while
| cd|aa1|bb2|cc3|cc4 | titi |
| b|a1 |b2 |c3 |NO_FOUND |NO_FOUND | (not_found if conditionnal false)
+---+---+---+---+----------------------+
+---+---+---+---+----------------------+
해결법
-
==============================
1.당신은 런타임 값에 따라 dataframe 스키마를 만들 수 없습니다. 나는 간단하게 수행하려고 할 것입니다. 먼저 더 낫다고는 기본 값으로 세 개의 열을 만듭니다
당신은 런타임 값에 따라 dataframe 스키마를 만들 수 없습니다. 나는 간단하게 수행하려고 할 것입니다. 먼저 더 낫다고는 기본 값으로 세 개의 열을 만듭니다
dataFrame.withColumn("mdp_codcat", lit("")) dataFrame.withColumn("mdp_idregl", lit("")) dataFrame.withColumn("usedRef", lit(""))
그런 다음 당신은 당신의 방송 값으로 UDF를 사용할 수 있습니다 :
def mdp_codcat(bRef: Broadcast[Array[RefRglSDC]]) = udf { (field: String) => { // Your while and if stuff // return your update data }}
그리고 각 필드에 각 UDF를 적용 :
dataframe.withColumn("mdp_codcat_new", mdp_codcat(bRef)("mdp_codcat"))
아마 도움이 될 수 있습니다
from https://stackoverflow.com/questions/52831391/how-to-use-withcolumn-spark-dataframe-scala-with-while by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 이유는 무엇입니까 단일 출력 파일에 하이브 결과에지도에만 작업 (0) | 2019.10.07 |
---|---|
[HADOOP] Sqoop을 자바 (7) (0) | 2019.10.07 |
[HADOOP] 데이터 노드와 노드 관리자는 의사 클러스터 모드로 시작되지 않는 (아파치 하둡) (0) | 2019.10.07 |
[HADOOP] 너무 많은 작은 파일 HDFS 싱크 수로 (0) | 2019.10.07 |
[HADOOP] HBase를 회복 (0) | 2019.10.07 |