[SCALA] 플레이 2.X : Iteratees와 반응성 파일 업로드
SCALA플레이 2.X : Iteratees와 반응성 파일 업로드
클라우드 스토리지에 파일 업로드 스칼라 API의 Iteratee를 사용하는 방법 (내 경우에 푸른 물방울 스토리지,하지만 난 지금이 가장 중요하다고 생각하지 않습니다) : 나는 질문으로 시작합니다
배경:
나는 푸른의 BlockBlobs 큰 미디어 파일 (300메가바이트 +)를 저장하기 위해 청크 1에 대한 MB의 블록으로 입력이 필요합니다. 불행하게도, 내 스칼라 지식 (내 프로젝트는 자바를 기반으로하고 거기에 스칼라의 유일한 사용이 업로드 컨트롤러 예정) 여전히 좋지 않습니다.
이 코드로 시도 : 왜 호출하게 오류 또는 BodyParser의 Iteratee에 요청 플레이 프레임 워크 2.0에 걸어 일을? (A 입력 Iteratee로) - 그것은 아주 잘하지만 eachElement 내가 사용할 수 있다는 작동이 클라우드로 몇 백 메가 바이트 파일을 전송 너무 작은, 그래서 8192 바이트의 크기를 갖는다.
나는 나에게 아주 새로운 접근 방식, 그리고 대부분의 아마 내가 오해 뭔가 말해야한다 (즉, I 오해의 모든 것을 말하고 싶어하지 않는다>)
나는 어떤 힌트를 감사하거나 주제 나에게 도움이됩니다 연결됩니다. 유사한 사용의 샘플이 있다면 그것은 나를 생각을위한 최선의 선택이 될 것입니다.
해결법
-
==============================
1.기본적으로 가장 먼저 필요한 것은 더 큰 덩어리, 1024 * 1024 바이트로 rechunk 입력입니다.
기본적으로 가장 먼저 필요한 것은 더 큰 덩어리, 1024 * 1024 바이트로 rechunk 입력입니다.
먼저하자가 (지난 덩어리 작은 가지고 확인) 바이트의 1m까지 소모 될 것 Iteratee이
val consumeAMB = Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
그 사용하여, 우리는 그룹화라는 API를 사용하여 덩어리를 재편성 것 Enumeratee (어댑터)를 구성 할 수 있습니다 :
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] = Enumeratee.grouped(consumeAMB)
여기에 각 청크에 넣어 양을 결정하는 데 사용에게 Iteratee를 그룹화합니다. 그것은 그것을 위해 우리의 consumeAMB을 사용합니다. 어느 결과 배열에 입력 [바이트]가 1MB를 rechunks Enumeratee 의미한다.
이제 우리는 바이트의 각 청크를 보낼 Iteratee.foldM 방법을 사용하는 BodyParser을 작성해야합니다 :
val writeToStore: Iteratee[Array[Byte],_] = Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => // write bytes and return next handle, probable in a Future }
foldM가 전달 함수에 따라 사용 상태를 전달 (S 입력 [배열 [바이트]]) => 미래 [S]는 상태의 새로운 미래를 반환한다. 미래가 완료 될 때까지 foldM 다시 함수를 호출하지 않으며 입력 가능한 덩어리가있다.
그리고 몸 파서는 입력을 rechunking하고 저장소에 밀어 것입니다 :
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
권리를 반환하면 (핸들러 여기를 될 일이) 몸 구문 분석 말까지 몸을 반환하는 것을 나타냅니다.
-
==============================
2.당신의 목표는 S3에 스트림 인 경우, 여기에 내가 구현 및 테스트 한 것을 도우미입니다 :
당신의 목표는 S3에 스트림 인 경우, 여기에 내가 구현 및 테스트 한 것을 도우미입니다 :
def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]]) (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = { import scala.collection.JavaConversions._ val initRequest = new InitiateMultipartUploadRequest(bucket, key) val initResponse = s3.initiateMultipartUpload(initRequest) val uploadId = initResponse.getUploadId val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped { Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume() } val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) => val uploadRequest = new UploadPartRequest() .withBucketName(bucket) .withKey(key) .withPartNumber(etags.length + 1) .withUploadId(uploadId) .withInputStream(new ByteArrayInputStream(bytes)) .withPartSize(bytes.length) val etag = Future { s3.uploadPart(uploadRequest).getPartETag } etag.map(etags :+ _) } val futETags = enum &> rechunker |>>> uploader futETags.map { etags => val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag]) s3.completeMultipartUpload(compRequest) }.recoverWith { case e: Exception => s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId)) Future.failed(e) } }
-
==============================
3.설정 파일에 다음을 추가
설정 파일에 다음을 추가
play.http.parser.maxMemoryBuffer = 256K
-
==============================
4.대신 완전히 새로운 BodyParser을 쓰는이 스트리밍 문제의 해결책을 알아 내려고 노력하는 사람들을 위해, 당신은 이미 parse.multipartFormData에 구현 된 내용을 사용할 수 있습니다. 당신은 기본 핸들러 handleFilePartAsTemporaryFile을 덮어 다음과 같이 구현할 수 있습니다.
대신 완전히 새로운 BodyParser을 쓰는이 스트리밍 문제의 해결책을 알아 내려고 노력하는 사람들을 위해, 당신은 이미 parse.multipartFormData에 구현 된 내용을 사용할 수 있습니다. 당신은 기본 핸들러 handleFilePartAsTemporaryFile을 덮어 다음과 같이 구현할 수 있습니다.
def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = { handleFilePart { case FileInfo(partName, filename, contentType) => (rechunkAdapter &>> writeToS3).map { _ => val compRequest = new CompleteMultipartUploadRequest(...) amazonS3Client.completeMultipartUpload(compRequest) ... } } } def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)
나는이 일을 할 수 있어요하지만 난하지 않도록 전체 업로드 프로세스 스트리밍 여부를 아직입니다. 나는 전체 파일이 클라이언트 측에서 전송 된 경우 S3는 업로드 시작 것, 몇 가지 큰 파일을 시도했다.
나는 위의 파서 구현을 보면서 나는 파일을 스트리밍 할 수 있어야하므로 모든 Iteratee를 사용하여 연결되어 생각합니다. 사람이 몇 가지 통찰력을 가지고 있다면, 그것은 매우 도움이 될 것입니다.
from https://stackoverflow.com/questions/11916911/play-2-x-reactive-file-upload-with-iteratees by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스파크 - 응용 프로그램을 제출할 때 오류 "A 마스터 URL이 구성에서 설정해야합니다" (0) | 2019.11.19 |
---|---|
[SCALA] 어떻게 스칼라에서 I 설정 여러 유형의 경계는 무엇입니까? (0) | 2019.11.19 |
[SCALA] 어떻게 스파크의 분류에 대한 정확한 데이터 프레임을 만들 수 있습니다 ML (0) | 2019.11.18 |
[SCALA] 매개 변수 목록에서 경우 클래스를 인스턴스화 (0) | 2019.11.18 |
[SCALA] 주장을 테스트하는 것은 무엇인가 컴파일되지해야 (0) | 2019.11.18 |