복붙노트

[SCALA] 어떻게 스칼라 스트림 클래스와 큰 CSV 파일을 읽습니까?

SCALA

어떻게 스칼라 스트림 클래스와 큰 CSV 파일을 읽습니까?

어떻게 스칼라 스트림과 큰 CSV 파일 (> 1 기가)를 읽을 수 있습니까? 당신은 코드 예제가 있습니까? 또는 먼저 메모리에로드하지 않고 큰 CSV 파일을 읽기 위해 다른 방법을 사용해야합니까?

해결법

  1. ==============================

    1.그냥 Source.fromFile (...)를 사용합니다. getLines을 이미 언급 한 바와 같이.

    그냥 Source.fromFile (...)를 사용합니다. getLines을 이미 언급 한 바와 같이.

    반환 이미 게으른 반복자는, (다시 읽을 수 있도록, 이전 memoized 할 값을 검색 싶어 게으른 컬렉션으로 스트림을 사용하십시오)

    당신이 메모리 문제가 있어요 경우, 문제는 getLines 후 무슨 일을하는지에 거짓말을합니다. 엄격한 수집을 강제로 toList 같은 조작은, 문제의 원인이됩니다.

  2. ==============================

    2.난 당신이 스트림과 스칼라의 collection.immutable.Stream을 의미하지 않는다 희망한다. 이것은 당신이 원하는 없습니다. 스트림 게으른이지만, 메모이 제이션을 수행합니다.

    난 당신이 스트림과 스칼라의 collection.immutable.Stream을 의미하지 않는다 희망한다. 이것은 당신이 원하는 없습니다. 스트림 게으른이지만, 메모이 제이션을 수행합니다.

    난 당신이 할 계획 무엇을 알고 있지만, 단지 메모리의 높은 금액을 사용하지 않고 아주 잘 작동합니다 파일 라인 별을 읽는하지 않습니다.

    getLines는 게으르게 평가해야하고 (파일이 AFAIK 이상 2³² 라인을 가지고 있지 않는 한) 충돌해서는 안된다. 만약 그렇다면, #scala에 문의하거나 버그 티켓을 파일 (또는 두 가지를 모두 수행).

  3. ==============================

    3.당신은 전체 파일의 내용을 모두 한 번에 메모리에로드 할 필요 피하면서 큰 파일을 줄 단위를 처리하기 위해 찾고 있다면, 당신은 반복자는 scala.io.Source에 의해 반환 사용할 수 있습니다.

    당신은 전체 파일의 내용을 모두 한 번에 메모리에로드 할 필요 피하면서 큰 파일을 줄 단위를 처리하기 위해 찾고 있다면, 당신은 반복자는 scala.io.Source에 의해 반환 사용할 수 있습니다.

    제가 사용 사례 정확히 이러한 유형의 사용 적은 기능 tryProcessSource (두 개의 서브 - 기능을 포함하는)을 갖는다. 함수는 첫 번째 요구되는 네 개의 파라미터를 채택한다. 다른 매개 변수는 제정신 기본값을 제공합니다.

    다음은 함수 프로파일 (전체 기능 구현이 하단에)입니다 :

    def tryProcessSource(
      file: File,
      parseLine: (Int, String) => Option[List[String]] =
        (index, unparsedLine) => Some(List(unparsedLine)),
      filterLine: (Int, List[String]) => Option[Boolean] =
        (index, parsedValues) => Some(true),
      retainValues: (Int, List[String]) => Option[List[String]] =
        (index, parsedValues) => Some(parsedValues),
    ): Try[List[List[String]]] = {
      ???
    }
    

    첫 번째 매개 변수 파일 : 파일이 필요합니다. 그리고 그것은 CSV처럼, 선 중심의 텍스트 파일을 가리키는 java.io.File의 단지 유효한 인스턴스입니다.

    두 번째 매개 변수, parseLine : (INT, 문자열) => 옵션 [목록 [문자열]]는 선택 사항입니다. 제공되는 경우에, 두 개의 입력 매개 변수를받을 것으로 예상 함수해야합니다; 인덱스 : INT, unparsedLine : 문자열. 그리고 옵션 [목록 [문자열]을 반환합니다. 이 함수는 유효 열 값 이루어진 일부 래핑리스트 [문자열] 돌아갈 수있다. 아니면 전체 스트리밍 프로세스가 초기에 중단되어 나타내는 None을 반환 할 수 있습니다. 이 매개 변수를 제공하지 않으면 (인덱스, 선) => 일부 (목록 (선))의 기본값이 제공됩니다. 전체 라인이 기본 결과는 단일 문자열 값으로 반환된다.

    세 번째 매개 변수, filterLine : (INT, 목록 [문자열]) => 옵션 [부울], 선택 사항입니다. 제공되는 경우에, 두 개의 입력 매개 변수를받을 것으로 예상 함수해야합니다; 인덱스 : INT, parsedValues ​​: 목록 [문자열]. 그리고 옵션 [부울]를 반환합니다. 이 함수는 특정 라인 출력에 포함되어야하는지 여부를 나타내는 일부 래핑 부울을 리턴 할 수있다. 아니면 전체 스트리밍 프로세스가 초기에 중단되어 나타내는 None을 반환 할 수 있습니다. 이 매개 변수가 제공되지 않으면, 디폴트 값 (인덱스 값) => 일부 (사실)가 제공됩니다. 모든 라인이 기본 결과는 포함되지.

    네 번째와 마지막 매개 변수, retainValues ​​: (INT는 목록 [문자열]) => 옵션 [목록 [문자열]]는 선택 사항입니다. 제공되는 경우에, 두 개의 입력 매개 변수를받을 것으로 예상 함수해야합니다; 인덱스 : INT, parsedValues ​​: 목록 [문자열]. 그리고 옵션 [목록 [문자열]을 반환합니다. 이 기능은 일부 부분 집합 및 / 또는 기존의 열 값의 변경으로 구성된 일부 포장 목록 [문자열]을 반환 할 수 있습니다. 아니면 전체 스트리밍 프로세스가 초기에 중단되어 나타내는 None을 반환 할 수 있습니다. 이 파라미터가 제공되지 않는 경우 (인덱스 값)의 디폴트 값 => 일부 (값)이 제공된다. 값에서이 기본 결과는 두 번째 매개 변수, parseLine에 의해 구문 분석.

    다음 내용 (4 선)을 가진 파일을 고려 :

    street,street2,city,state,zip
    100 Main Str,,Irving,TX,75039
    231 Park Ave,,Irving,TX,75039
    1400 Beltline Rd,Apt 312,Dallas,Tx,75240
    

    다음 호출 프로필 ...

    val tryLinesDefaults =
      tryProcessSource(new File("path/to/file.csv"))
    

    ... tryLinesDefaults (파일의 변경되지 않은 내용)이 출력 결과 :

    Success(
      List(
        List("street,street2,city,state,zip"),
        List("100 Main Str,,Irving,TX,75039"),
        List("231 Park Ave,,Irving,TX,75039"),
        List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
      )
    )
    

    다음 호출 프로필 ...

    val tryLinesParseOnly =
      tryProcessSource(
          new File("path/to/file.csv")
        , parseLine =
            (index, unparsedLine) => Some(unparsedLine.split(",").toList)
      )
    

    ... tryLinesParseOnly (각 열 값에 해석 각 라인)에 대해이 출력 결과 :

    Success(
      List(
        List("street","street2","city","state","zip"),
        List("100 Main Str","","Irving,TX","75039"),
        List("231 Park Ave","","Irving","TX","75039"),
        List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
      )
    )
    

    다음 호출 프로필 ...

    val tryLinesIrvingTxNoHeader =
      tryProcessSource(
          new File("C:/Users/Jim/Desktop/test.csv")
        , parseLine =
            (index, unparsedLine) => Some(unparsedLine.split(",").toList)
        , filterLine =
            (index, parsedValues) =>
              Some(
                (index != 0) && //skip header line
                (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
                (parsedValues(3).toLowerCase == "Tx".toLowerCase)
              )
      )
    

    ... tryLinesIrvingTxNoHeader (각 열 값, 헤더 또는 텍사스 주 어빙에 단지 2 열의 각 행에 해석)이 출력 결과 :

    Success(
      List(
        List("100 Main Str","","Irving,TX","75039"),
        List("231 Park Ave","","Irving","TX","75039"),
      )
    )
    

    여기에 전체 tryProcessSource 기능 구현이다 :

    import scala.io.Source
    import scala.util.Try
    
    import java.io.File
    
    def tryProcessSource(
      file: File,
      parseLine: (Int, String) => Option[List[String]] =
        (index, unparsedLine) => Some(List(unparsedLine)),
      filterLine: (Int, List[String]) => Option[Boolean] =
        (index, parsedValues) => Some(true),
      retainValues: (Int, List[String]) => Option[List[String]] =
        (index, parsedValues) => Some(parsedValues)
    ): Try[List[List[String]]] = {
      def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
        try {Try(transfer(source))} finally {source.close()}
      def recursive(
        remaining: Iterator[(String, Int)],
        accumulator: List[List[String]],
        isEarlyAbort: Boolean =
          false
      ): List[List[String]] = {
        if (isEarlyAbort || !remaining.hasNext)
          accumulator
        else {
          val (line, index) =
            remaining.next
          parseLine(index, line) match {
            case Some(values) =>
              filterLine(index, values) match {
                case Some(keep) =>
                  if (keep)
                    retainValues(index, values) match {
                      case Some(valuesNew) =>
                        recursive(remaining, valuesNew :: accumulator) //capture values
                      case None =>
                        recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                    }
                  else
                    recursive(remaining, accumulator) //discard row
                case None =>
                  recursive(remaining, accumulator, isEarlyAbort = true) //early abort
              }
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        }
      }
      Try(Source.fromFile(file)).flatMap(
        bufferedSource =>
          usingSource(bufferedSource) {
            source =>
              recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
          }
      )
    }
    

    이 솔루션은 상대적으로 간결하지만, 그것은 나에게 상당한 시간이 걸렸습니다와 나는 마침내 여기까지 얻을 수 있었다 전에 많은 리팩토링 전달합니다. 당신이 그것을 개선 할 수있는 방법을 볼 수 있으면 알려 주시기 바랍니다.

    업데이트 : 난 그냥 자신에 StackOverflow의 질문으로 아래의 문제를 요구했다. 그리고 지금 아래에 언급 된 오류를 수정 답변이 있습니다.

    나는 노력이 훨씬 더 일반적인 아래의 새 제네릭는 ified 함수 정의와 transformLine에 매개 변수 retainValues을 변경 할 수있는 생각을했다. 그러나, 나는 인 IntelliJ의 하이라이트 오류가 "일부는 [목록 [문자열] 예상 유형 옵션 [A]을 준수하지 않는 유형의 표현"을 유지하고 오류 때문에 기본값을 변경하는 방법을 알아낼 수 없습니다 가버 리다.

    def tryProcessSource2[A <: AnyRef](
      file: File,
      parseLine: (Int, String) => Option[List[String]] =
        (index, unparsedLine) => Some(List(unparsedLine)),
      filterLine: (Int, List[String]) => Option[Boolean] =
        (index, parsedValues) => Some(true),
      transformLine: (Int, List[String]) => Option[A] =
        (index, parsedValues) => Some(parsedValues)
    ): Try[List[A]] = {
      ???
    }
    

    이 작업을하는 방법에 대한 도움을 주시면 감사하겠습니다.

  4. from https://stackoverflow.com/questions/4255021/how-do-i-read-a-large-csv-file-with-scala-stream-class by cc-by-sa and MIT license