복붙노트

[HADOOP] Hadoop 버전 2.7.2를 사용하여 Spark에서 S3a 프로토콜을 사용하여 S3에 액세스하기

HADOOP

Hadoop 버전 2.7.2를 사용하여 Spark에서 S3a 프로토콜을 사용하여 S3에 액세스하기

pyspark (버전 2.2.0)에서 s3 (s3a 프로토콜)에 액세스하려고하는데 어려움을 겪고 있습니다.

Hadoop 및 AWS SDK 패키지를 사용하고 있습니다.

pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2

내 코드는 다음과 같습니다.

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

rdd = sc.textFile('s3a://spark-test-project/large-file.csv')
print(rdd.first().show())

나는 이것을 얻는다 :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1361, in first
    rs = self.take(1)
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1313, in take
    totalParts = self.getNumPartitions()
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 385, in getNumPartitions
    return self._jrdd.partitions().size()
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.partitions.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 32750D3DED4067BD, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: jAhO0tWTblPEUehF1Bul9WZj/9G7woaHFVxb8gzsOpekam82V/Rm9zLgdLDNsGZ6mPizGZmo6xI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

AWS Java SDK의 버그입니까? AWS 이외의 AWS에서 더 나은 로깅 정보를 얻을 수있는 방법이 있는지 모르겠다. 오류 코드 : null

해결법

  1. ==============================

    1.그것이 가치가있는 것을 위해, 나는 aws에 나의 spark-defaults.conf 파일에 다음 줄을 가지고있다 :

    그것이 가치가있는 것을 위해, 나는 aws에 나의 spark-defaults.conf 파일에 다음 줄을 가지고있다 :

    spark.jars.packages com.amazonaws:aws-java-sdk:1.11.99,org.apache.hadoop:hadoop-aws:2.7.2
    

    또한 EC2를 설정할 때 사용하는 보안 그룹이 s3에 액세스 할 수 있는지 확인했습니다.

    그 두 가지 후에, s3에서 파일을 읽는 데 문제가 없었습니다.

    %pyspark
    df = spark.read.csv("s3a://my_bucket/name/")
    

    또는 AWS EMR을 사용하는 경우 즉시 상자에 s3에 액세스 할 수 있어야합니다.

    %pyspark
    df = spark.read.csv("s3://my_bucket/name/")
    
  2. ==============================

    2."잘못된 요청"은 S3에서 두려워 할 메시지입니다.이 메시지는 "작동하지 않았으므로 그 이유를 알려주지 않습니다"라는 의미입니다.

    "잘못된 요청"은 S3에서 두려워 할 메시지입니다.이 메시지는 "작동하지 않았으므로 그 이유를 알려주지 않습니다"라는 의미입니다.

    S3A 문제 해결에 대한 전체 섹션이 문서에 있습니다.

    버킷이 S3 "v4"인증 프로토콜 (frankfurt, london, seoul) 만 지원하는 호스트라면 fs.s3a.endpoint 필드를 특정 영역의 필드로 설정해야합니다. 문서에 세부 정보가 있습니다.

    그렇지 않으면 s3a : //landsat-pds/scene_list.gz를 소스로 사용해보십시오. 그것은 인증이 필요없는 공개 CSV 파일입니다. 너가 그것을 볼 수 없다면, 너는 심각한 문제에 빠져있다.

  3. from https://stackoverflow.com/questions/45968326/accessing-s3-using-s3a-protocol-from-spark-using-hadoop-version-2-7-2 by cc-by-sa and MIT license