복붙노트

[SCALA] 플레이 2.X : Iteratees와 반응성 파일 업로드

SCALA

플레이 2.X : Iteratees와 반응성 파일 업로드

클라우드 스토리지에 파일 업로드 스칼라 API의 Iteratee를 사용하는 방법 (내 경우에 푸른 물방울 스토리지,하지만 난 지금이 ​​가장 중요하다고 생각하지 않습니다) : 나는 질문으로 시작합니다

배경:

나는 푸른의 BlockBlobs 큰 미디어 파일 (300메가바이트 +)를 저장하기 위해 청크 1에 대한 MB의 블록으로 입력이 필요합니다. 불행하게도, 내 스칼라 지식 (내 프로젝트는 자바를 기반으로하고 거기에 스칼라의 유일한 사용이 업로드 컨트롤러 예정) 여전히 좋지 않습니다.

이 코드로 시도 : 왜 호출하게 오류 또는 BodyParser의 Iteratee에 요청 플레이 프레임 워크 2.0에 걸어 일을? (A 입력 Iteratee로) - 그것은 아주 잘하지만 eachElement 내가 사용할 수 있다는 작동이 클라우드로 몇 백 메가 바이트 파일을 전송 너무 작은, 그래서 8192 바이트의 크기를 갖는다.

나는 나에게 아주 새로운 접근 방식, 그리고 대부분의 아마 내가 오해 뭔가 말해야한다 (즉, I 오해의 모든 것을 말하고 싶어하지 않는다>)

나는 어떤 힌트를 감사하거나 주제 나에게 도움이됩니다 연결됩니다. 유사한 사용의 샘플이 있다면 그것은 나를 생각을위한 최선의 선택이 될 것입니다.

해결법

  1. ==============================

    1.기본적으로 가장 먼저 필요한 것은 더 큰 덩어리, 1024 * 1024 바이트로 rechunk 입력입니다.

    기본적으로 가장 먼저 필요한 것은 더 큰 덩어리, 1024 * 1024 바이트로 rechunk 입력입니다.

    먼저하자가 (지난 덩어리 작은 가지고 확인) 바이트의 1m까지 소모 될 것 Iteratee이

    val consumeAMB = 
      Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
    

    그 사용하여, 우리는 그룹화라는 API를 사용하여 덩어리를 재편성 것 Enumeratee (어댑터)를 구성 할 수 있습니다 :

    val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
      Enumeratee.grouped(consumeAMB)
    

    여기에 각 청크에 넣어 양을 결정하는 데 사용에게 Iteratee를 그룹화합니다. 그것은 그것을 위해 우리의 consumeAMB을 사용합니다. 어느 결과 배열에 입력 [바이트]가 1MB를 rechunks Enumeratee 의미한다.

    이제 우리는 바이트의 각 청크를 보낼 Iteratee.foldM 방법을 사용하는 BodyParser을 작성해야합니다 :

    val writeToStore: Iteratee[Array[Byte],_] =
      Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
        // write bytes and return next handle, probable in a Future
      }
    

    foldM가 전달 함수에 따라 사용 상태를 전달 (S 입력 [배열 [바이트]]) => 미래 [S]는 상태의 새로운 미래를 반환한다. 미래가 완료 될 때까지 foldM 다시 함수를 호출하지 않으며 입력 가능한 덩어리가있다.

    그리고 몸 파서는 입력을 rechunking하고 저장소에 밀어 것입니다 :

    BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
    

    권리를 반환하면 (핸들러 여기를 될 일이) 몸 구문 분석 말까지 몸을 반환하는 것을 나타냅니다.

  2. ==============================

    2.당신의 목표는 S3에 스트림 인 경우, 여기에 내가 구현 및 테스트 한 것을 도우미입니다 :

    당신의 목표는 S3에 스트림 인 경우, 여기에 내가 구현 및 테스트 한 것을 도우미입니다 :

    def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                    (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
      import scala.collection.JavaConversions._
    
      val initRequest = new InitiateMultipartUploadRequest(bucket, key)
      val initResponse = s3.initiateMultipartUpload(initRequest)
      val uploadId = initResponse.getUploadId
    
      val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
        Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
      }
    
      val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
        val uploadRequest = new UploadPartRequest()
          .withBucketName(bucket)
          .withKey(key)
          .withPartNumber(etags.length + 1)
          .withUploadId(uploadId)
          .withInputStream(new ByteArrayInputStream(bytes))
          .withPartSize(bytes.length)
    
        val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
        etag.map(etags :+ _)
      }
    
      val futETags = enum &> rechunker |>>> uploader
    
      futETags.map { etags =>
        val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
        s3.completeMultipartUpload(compRequest)
      }.recoverWith { case e: Exception =>
        s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
        Future.failed(e)
      }
    
    }
    
  3. ==============================

    3.설정 파일에 다음을 추가

    설정 파일에 다음을 추가

    play.http.parser.maxMemoryBuffer = 256K

  4. ==============================

    4.대신 완전히 새로운 BodyParser을 쓰는이 스트리밍 문제의 해결책을 알아 내려고 노력하는 사람들을 위해, 당신은 이미 parse.multipartFormData에 구현 된 내용을 사용할 수 있습니다. 당신은 기본 핸들러 handleFilePartAsTemporaryFile을 덮어 다음과 같이 구현할 수 있습니다.

    대신 완전히 새로운 BodyParser을 쓰는이 스트리밍 문제의 해결책을 알아 내려고 노력하는 사람들을 위해, 당신은 이미 parse.multipartFormData에 구현 된 내용을 사용할 수 있습니다. 당신은 기본 핸들러 handleFilePartAsTemporaryFile을 덮어 다음과 같이 구현할 수 있습니다.

    def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
      handleFilePart {
        case FileInfo(partName, filename, contentType) =>
    
          (rechunkAdapter &>> writeToS3).map {
            _ =>
              val compRequest = new CompleteMultipartUploadRequest(...)
              amazonS3Client.completeMultipartUpload(compRequest)
              ...
          }
      }
    }
    
    def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)
    

    나는이 일을 할 수 있어요하지만 난하지 않도록 전체 업로드 프로세스 스트리밍 여부를 아직입니다. 나는 전체 파일이 클라이언트 측에서 전송 된 경우 S3는 업로드 시작 것, 몇 가지 큰 파일을 시도했다.

    나는 위의 파서 구현을 보면서 나는 파일을 스트리밍 할 수 있어야하므로 모든 Iteratee를 사용하여 연결되어 생각합니다. 사람이 몇 가지 통찰력을 가지고 있다면, 그것은 매우 도움이 될 것입니다.

  5. from https://stackoverflow.com/questions/11916911/play-2-x-reactive-file-upload-with-iteratees by cc-by-sa and MIT license