복붙노트

[HADOOP] sc.textFile ()을 사용하여 스파크 할 로컬 파일로드

HADOOP

sc.textFile ()을 사용하여 스파크 할 로컬 파일로드

sc.textFile을 사용하여 로컬 파일 시스템에서 Spark로 파일을로드하는 방법? -env 변수를 변경해야합니까? 또한 Hadoop이 설치되어 있지 않은 창에서 동일한 작업을 시도 할 때도 같은 오류가 발생합니다.

> val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(63280) called with curMem=0, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.8 KB, free 265.1 MB)
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(19750) called with curMem=63280, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.3 KB, free 265.1 MB)
/17 22:28:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:53659 (size: 19.3 KB, free: 265.1 MB)
/17 22:28:18 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
File: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

> val words = input.flatMap(line => line.split(" "))
ole>:19: error: not found: value input
  val words = input.flatMap(line => line.split(" "))
              ^

> val words = inputFile.flatMap(line => line.split(" "))
: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23

> val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/c:/spark-1.4.1-bin-hadoop2.6/bin/file/C:/Users/swaapnika/Desktop/to do list
   at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
   at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
   at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
   at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:289)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
   at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
   at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
   at $iwC$$iwC$$iwC.<init>(<console>:38)
   at $iwC$$iwC.<init>(<console>:40)
   at $iwC.<init>(<console>:42)
   at <init>(<console>:44)
   at .<init>(<console>:48)
   at .<clinit>(<console>)
   at .<init>(<console>:7)
   at .<clinit>(<console>)
   at $print(<console>)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
   at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
   at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
   at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
   at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
   at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
   at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
   at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
   at org.apache.spark.repl.Main$.main(Main.scala:31)
   at org.apache.spark.repl.Main.main(Main.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at 

org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


>

해결법

  1. ==============================

    1.나는 모든 의존성과 환경 변수를 다시 검사했다. hadoop env.sh 파일의 기본 파일 시스템은 fs.defaultFs로 설정되어 있으므로 실제 경로 "file : ///home/..../ ...txt"는 로컬 파일 시스템에서 데이터를 가져옵니다. 변경없이 Spark-env.sh를 기본값으로두면 경로가 "hdfs : // .."일 때 "file : // ..."을 만나면 로컬 파일 시스템을 사용하고 hdfs를 사용합니다. 특정 파일 시스템에서 특히 HADOOP_CONF_DIR을 spark-env.sh로 내 보내야하는 경우 또한 Hadoop이 지원하는 모든 파일 시스템을 지원합니다. 이것은 나의 관찰이었다. 허용 된 모든 수정이나 제안. 고맙습니다

    나는 모든 의존성과 환경 변수를 다시 검사했다. hadoop env.sh 파일의 기본 파일 시스템은 fs.defaultFs로 설정되어 있으므로 실제 경로 "file : ///home/..../ ...txt"는 로컬 파일 시스템에서 데이터를 가져옵니다. 변경없이 Spark-env.sh를 기본값으로두면 경로가 "hdfs : // .."일 때 "file : // ..."을 만나면 로컬 파일 시스템을 사용하고 hdfs를 사용합니다. 특정 파일 시스템에서 특히 HADOOP_CONF_DIR을 spark-env.sh로 내 보내야하는 경우 또한 Hadoop이 지원하는 모든 파일 시스템을 지원합니다. 이것은 나의 관찰이었다. 허용 된 모든 수정이나 제안. 고맙습니다

  2. ==============================

    2.변경 시도

    변경 시도

    val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
    

    이에:

    val inputFile = sc.textFile("file:///Users/swaapnika/Desktop/to do list")
    

    나는 또한 hadoop과 spark에 상당히 익숙하지만, Windows에서 로컬로 spark를 실행할 때, sc : file ///은 sc.textFile에 전달 될 때 이미 C : \를 참조합니다.

  3. ==============================

    3.정의한 파일 경로가 올바르지 않습니다.

    정의한 파일 경로가 올바르지 않습니다.

    변경 시도

    sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
    

    sc.textFile("file://C:/Users/swaapnika/Desktop/to do list")
    

    또는

    sc.textFile("C:/Users/swaapnika/Desktop/to do list") 
    
  4. ==============================

    4.이 오류는 클러스터에서 spark를 실행할 때 발생합니다. 클러스터를 시작하기 위해 작업을 제출하면 클러스터 관리자 (YARN 또는 Mesos 또는 기타)가이를 작업자 노드에 제출합니다. 작업자 노드가 스파크로로드해야하는 파일의 경로를 찾으려고 할 때 작업자가 그러한 파일을 가지고 있지 않기 때문에 실패합니다. 그래서 로컬 모드에서 spark-shell을 실행하고 다시 시도해보십시오.

    이 오류는 클러스터에서 spark를 실행할 때 발생합니다. 클러스터를 시작하기 위해 작업을 제출하면 클러스터 관리자 (YARN 또는 Mesos 또는 기타)가이를 작업자 노드에 제출합니다. 작업자 노드가 스파크로로드해야하는 파일의 경로를 찾으려고 할 때 작업자가 그러한 파일을 가지고 있지 않기 때문에 실패합니다. 그래서 로컬 모드에서 spark-shell을 실행하고 다시 시도해보십시오.

    \ bin \ spark-shell - 마스터 로컬

    sc.textFile ( "file : /// C : / Users / swaapnika / Desktop / to do list")

    이게 도움이되는지 알려주세요.

  5. from https://stackoverflow.com/questions/32064280/load-a-local-file-to-spark-using-sc-textfile by cc-by-sa and MIT license