복붙노트

[HADOOP] Scalding에서 버켓 출력 방법

HADOOP

Scalding에서 버켓 출력 방법

다른 디렉토리에 파이프를 출력하려고하는데, 각 디렉토리의 출력은 일부 ID를 기반으로 버킷으로 처리됩니다. 그래서 plain map reduce 코드에서 MultipleOutputs 클래스를 사용할 것이고 나는 감속기에서 이와 같은 것을 할 것입니다.

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

그래서 나는 사람이 이것을 끓일 때 이렇게 할 수 있다고 생각합니다.

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

하지만 여러 번 같은 파이프를 읽는다면 전체 성능에 영향을 미칠 것입니다.

다른 대안이 있습니까?

감사

해결법

  1. ==============================

    1.예, 물론 TemplatedTsv를 사용하는 더 좋은 방법이 있습니다.

    예, 물론 TemplatedTsv를 사용하는 더 좋은 방법이 있습니다.

    위의 코드는 다음과 같이 작성할 수 있습니다.

    val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
        .read
        .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))
    

    그러면 'some_id'에서 오는 모든 레코드를 out / some_ids 폴더 아래의 별도 폴더에 넣을 수 있습니다.

    그러나 정수 버킷을 만들 수도 있습니다. 마지막 줄 바꾸기,

    .map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
    .write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))
    

    그러면 out / dd /로 두 자릿수 폴더가 생성됩니다. templatedTsv API도 여기에서 확인할 수 있습니다.

    감속기가 결과를 사용하여 다음 작업에 좋지 않은 작은 파일을 많이 생성 할 수있는 templatedTsv를 사용하면 작은 문제가 발생할 수 있습니다. 따라서 디스크에 쓰기 전에 템플릿 필드를 정렬하는 것이 좋습니다. 나는 그것에 관해 그것에 관해 blog를 여기에서 썼다.

  2. from https://stackoverflow.com/questions/28357987/how-to-bucket-outputs-in-scalding by cc-by-sa and MIT license