복붙노트

[SCALA] 어레이 스파크 SQL의 요소 범위를 선택

SCALA

어레이 스파크 SQL의 요소 범위를 선택

나는 작업 아래의 작업을 수행하기 위해 스파크 쉘을 사용합니다.

최근 스파크 SQL에서 어레이 열이 테이블을로드.

여기에 같은 대한 DDL은 다음과 같습니다

create table test_emp_arr{
    dept_id string,
    dept_nm string,
    emp_details Array<string>
}

데이터는 다음과 같이 보입니다

+ ------- + ------- + ------------------------------- + | DEPT_ID | dept_nm | emp_details | + ------- + ------- + ------------------------------- + | 10 | 금융 | [존, 눈, 성, 블랙, 네드] | | 20 | IT | | [네드, 아니, 더입니다] + ------- + ------- + ------------------------------- + 나는이 같은 emp_details 열 뭔가를 조회 할 수 있습니다 : sqlContext.sql ( "선택 emp_details emp_details에서 [0]"). 쇼 문제 나는 컬렉션의 요소의 범위를 조회 할 : 작업에 예상 된 쿼리 sqlContext.sql ( "선택 emp_details emp_details에서 [0-2]"). 쇼 또는 sqlContext.sql ( "선택 emp_details [0 : 2]로부터 emp_details")보기. 예상 출력 + ------------------- + | emp_details | + ------------------- + | [존, 눈, 성] | | | [네드, 아니, 없다] + ------------------- + 순수 스칼라에서, 나는 배열 뭔가를 같이있는 경우 : 발에 emp_details = 배열 ​​( "존", "눈", "성", "블랙") 내가 사용하는 0 ~ 2 범위 요소를 얻을 수 있습니다 emp_details.slice (0,3) 저를 반환 어레이 (존, 눈, 성) 나는 불꽃-SQL에서 배열의 이상 동작을 적용 할 수 없습니다입니다. 감사

해결법

  1. ==============================

    1.여기에 원하는 슬라이스 크기 근무의 장점이있는 사용자 정의 함수를 사용하여 솔루션입니다. 그것은 단순히 조각 방법 내장 스칼라 주위 UDF 기능을 구축 :

    여기에 원하는 슬라이스 크기 근무의 장점이있는 사용자 정의 함수를 사용하여 솔루션입니다. 그것은 단순히 조각 방법 내장 스칼라 주위 UDF 기능을 구축 :

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    
    val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))
    

    데이터의 샘플 예 :

    val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
    df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show
    

    예상되는 출력을 생성합니다

    +--------------------+-------------------+
    |         emp_details|              slice|
    +--------------------+-------------------+
    |[Jon, Snow, Castl...|[Jon, Snow, Castle]|
    +--------------------+-------------------+
    

    또한는 SqlContext에서 UDF를 등록하고 다음과 같이 사용할 수 있습니다

    sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
    sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon‌​','Snow','Castle','Black','Ned'),0,3)")
    

    당신은 필요가이 솔루션을 더 이상 점등되지 않습니다

  2. ==============================

    2.스파크 2.4 이후는 슬라이스 기능을 사용할 수 있습니다. ) 파이썬에서 :

    스파크 2.4 이후는 슬라이스 기능을 사용할 수 있습니다. ) 파이썬에서 :

    from pyspark.sql.functions import slice
    
    df = spark.createDataFrame([
        (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
        (20, "IT", ["Ned", "is", "no", "more"])
    ], ("dept_id", "dept_nm", "emp_details"))
    
    df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
    
    +-------------------+
    |       empt_details|
    +-------------------+
    |[Jon, Snow, Castle]|
    |      [Ned, is, no]|
    +-------------------+
    

    에서 스칼라

    import org.apache.spark.sql.functions.slice
    
    val df = Seq(
        (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
        (20, "IT", Seq("Ned", "is", "no", "more"))
    ).toDF("dept_id", "dept_nm", "emp_details")
    
    df.select(slice($"emp_details", 1, 3) as "empt_details").show
    
    +-------------------+
    |       empt_details|
    +-------------------+
    |[Jon, Snow, Castle]|
    |      [Ned, is, no]|
    +-------------------+
    

    같은 일이 SQL에서 수행 과정이 될 수 있습니다

    SELECT slice(emp_details, 1, 3) AS emp_details FROM df
    

    중대한:

    참고 Seq.slice 달리, 값이 0에서 색인 및 두 번째 인수의 길이입니다, 위치를 종료하지하시기 바랍니다.

  3. ==============================

    3.EDIT2 : 누가 가독성을 희생 UDF하고 싶지 들어 ;-)

    EDIT2 : 누가 가독성을 희생 UDF하고 싶지 들어 ;-)

    당신이 정말로 한 번에 그것을하고 싶은 경우에, 당신은 칼럼의 순서를 반환하는 람다 함수를 만들고 배열을 래핑하는 스칼라을 사용해야합니다. 이것은 조금 관여이지만, 한 단계입니다 :

    val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")
    
    df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)    
    
    
    +-------------------------------+-------------------+
    |emp_details                    |slice              |
    +-------------------------------+-------------------+
    |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
    +-------------------------------+-------------------+
    

    _ * 소위 가변 함수에리스트를 전달할 마법의 비트를 작동 (이 경우 배열을하는 구조물은 SQL 배열). 그러나 나는있는 그대로이 솔루션을 사용에 대해 조언을 것입니다. 명명 함수 람다 함수 넣어

    def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))
    

    코드 가독성. 하는 것으로 일반적으로, (`UDF를 사용하지 않고) 열 표현에 집착하는 것은 더 나은 성능을 가지고있다.

    편집 : 순서 (당신이 ... 당신의 질문에 질문으로) 당신은 (그것이 가장 읽을의 말을하지 않음) SQL 쿼리 사용하여 스칼라 로직을 생성하는 것과 같은 논리에 따라는 SQL 문에 그것을 할 수에서

    def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
    val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"
    
    sqlContext.sql(sqlQuery).show
    
    +-------------------------------+-------------------+
    |emp_details                    |slice              |
    +-------------------------------+-------------------+
    |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
    +-------------------------------+-------------------+
    

    당신이 반복이 중지되는 요소보다는 마지막으로 촬영 한 요소를 제공하기 위해에 의한 때까지 대체 할 수 있습니다.

  4. ==============================

    4.당신은 세 값의 새로운 배열 아웃을 구축하는 기능 배열을 사용할 수 있습니다 :

    당신은 세 값의 새로운 배열 아웃을 구축하는 기능 배열을 사용할 수 있습니다 :

    import org.apache.spark.sql.functions._
    
    val input = sqlContext.sql("select emp_details from emp_details")
    
    val arr: Column = col("emp_details")
    val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")
    
    val result.show()
    // +-------------------+
    // |        emp_details|
    // +-------------------+
    // |[Jon, Snow, Castle]|
    // |      [Ned, is, no]|
    // +-------------------+
    
  5. ==============================

    5.예를 들어 :

    예를 들어 :

    fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
    
  6. ==============================

    6.여기 내 일반적인 슬라이스 UDF, 어떤 유형의 지원 배열입니다. 당신이 사전에 요소 유형을 알 필요가 있기 때문에 약간의 추한 비트.

    여기 내 일반적인 슬라이스 UDF, 어떤 유형의 지원 배열입니다. 당신이 사전에 요소 유형을 알 필요가 있기 때문에 약간의 추한 비트.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
      if (arr == null) null else arr.slice(from, until)
    
    def slice(elemType: DataType): UserDefinedFunction = 
      udf(arraySlice _, ArrayType(elemType)
    
    fs.select(slice(StringType)($"emp_details", 1, 2))
    
  7. ==============================

    7.당신의 사람들을 위해 스파크 <2.4을 사용하여 붙어과 슬라이스 기능이없는, 여기 UDF를 사용하지 않는 pySpark의 솔루션 (스칼라는 매우 유사하다)입니다. 대신은 스파크 SQL 함수의 CONCAT_WS, SUBSTRING_INDEX, 분할을 사용합니다.

    당신의 사람들을 위해 스파크 <2.4을 사용하여 붙어과 슬라이스 기능이없는, 여기 UDF를 사용하지 않는 pySpark의 솔루션 (스칼라는 매우 유사하다)입니다. 대신은 스파크 SQL 함수의 CONCAT_WS, SUBSTRING_INDEX, 분할을 사용합니다.

    이것은 단지 문자열 배열 함께 작동합니다. 그것은 다른 종류의 배열로 작업하려면, 당신은 당신이 배열 '슬라이스'이 후 먼저 다음 원래의 형태로 다시 캐스팅 문자열로 캐스팅해야합니다.

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = (SparkSession.builder
        .master('yarn')
        .appName("array_slice")
        .getOrCreate()
    )
    
    emp_details = [
        ["Jon", "Snow", "Castle", "Black", "Ned"],
        ["Ned", "is", "no", "more"]
    ]
    
    df1 = spark.createDataFrame(
        [tuple([emp]) for emp in emp_details],
        ["emp_details"]
    )
    
    df1.show(truncate=False)
    
    +-------------------------------+
    |emp_details                    |
    +-------------------------------+
    |[Jon, Snow, Castle, Black, Ned]|
    |[Ned, is, no, more]            |
    +-------------------------------+
    
    last_string = 2
    
    df2 = (
        df1
        .withColumn('last_string', (F.lit(last_string)))
        .withColumn('concat', F.concat_ws(" ", F.col('emp_details')))
        .withColumn('slice', F.expr("substring_index(concat, ' ', last_string + 1)" ))
        .withColumn('slice', F.split(F.col('slice'), ' '))
        .select('emp_details', 'slice')
    )
    
    df2.show(truncate=False)
    
    +-------------------------------+-------------------+
    |emp_details                    |slice              |
    +-------------------------------+-------------------+
    |[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
    |[Ned, is, no, more]            |[Ned, is, no]      |
    +-------------------------------+-------------------+
    
  8. ==============================

    8.중첩 된 분할을 사용합니다 :

    중첩 된 분할을 사용합니다 :

    분할 (분할 (CONCAT_WS ( ''emp_details) CONCAT ( ''emp_details [3])) [0], '')

    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession
    
    scala> val spark=SparkSession.builder().getOrCreate()
    spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673
    
    scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
    18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
    
    scala> df.createOrReplaceTempView("raw_data")
    
    scala> df.show()
    +-------+-------+--------------------+
    |dept_id|dept_nm|         emp_details|
    +-------+-------+--------------------+
    |     10|Finance|[Jon, Snow, Castl...|
    |     20|     IT| [Ned, is, no, more]|
    +-------+-------+--------------------+
    
    
    scala> val df2 = spark.sql(
         | s"""
         | |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
         | """)
    df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
    
    scala> df2.show()
    +-------+-------+-------------------+
    |dept_id|dept_nm|        emp_details|
    +-------+-------+-------------------+
    |     10|Finance|[Jon, Snow, Castle]|
    |     20|     IT|      [Ned, is, no]|
    +-------+-------+-------------------+
    
  9. from https://stackoverflow.com/questions/40134975/selecting-a-range-of-elements-in-an-array-spark-sql by cc-by-sa and MIT license