복붙노트

[REDIS] DataFrame 지붕 EET [(문자열, 문자열)] 변환

REDIS

DataFrame 지붕 EET [(문자열, 문자열)] 변환

나는 org.apache.spark.rdd.RDD Databricks에서 [(문자열, 문자열)]에 org.apache.spark.sql.DataFrame을 변환 할. 누구든지 도움이 수 있습니까?

배경 (더 ​​나은 솔루션도 가능) : I는 (일부 단계 후) 2 열 데이터 프레임이되는 카프카 스트림을 갖는다. 나는 값으로 키와 두 번째 열 등의 레디 스 캐시, 첫 번째 열이 점을 넣고 싶습니다.

보다 구체적으로, 입력의 유형이있다 : lastContacts : org.apache.spark.sql.DataFrame = serialNumber가 : 문자열 LASTMODIFIED : BIGINT]. 다음과 같이 나는 레디 스에 투입하려고 :

sc.toRedisKV(lastContacts)(redisConfig)

오류 메시지는 다음과 같습니다 :

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

난 이미 도움 일부 (기능 .rdd 같은) 생각하지만 아무도 주위했다.

해결법

  1. ==============================

    1.다른 RDD 요소에 행을 매핑 할 경우 (행 => ...)는 RDD에 dataframe을 변환 df.map 사용할 수 있습니다.

    다른 RDD 요소에 행을 매핑 할 경우 (행 => ...)는 RDD에 dataframe을 변환 df.map 사용할 수 있습니다.

    예를 들면 :

    val df = Seq(("table1",432),
          ("table2",567),
          ("table3",987),
          ("table1",789)).
          toDF("tablename", "Code").toDF()
    
        df.show()
    
        +---------+----+
    |tablename|Code|
    +---------+----+
    |   table1| 432|
    |   table2| 567|
    |   table3| 987|
    |   table1| 789|
    +---------+----+
    
        val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
    
        OR
    
        val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]
    

    AnalysisException에 대한 https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html을 참조하십시오 : 소스를 스트리밍 쿼리가 writeStream.start으로 실행되어야합니다 ()

    당신은) (query.awaitTermination를 사용하여 쿼리의 종료를 기다릴 필요 쿼리가 활성화 된 상태에서 종료에서 프로세스를 방지합니다.

  2. from https://stackoverflow.com/questions/55335877/dataframe-to-rddstring-string-conversion by cc-by-sa and MIT license