[HADOOP] HDFS는 : java.io.FileNotFoundException : 파일이 존재하지 않습니다 name._COPYING를
HADOOPHDFS는 : java.io.FileNotFoundException : 파일이 존재하지 않습니다 name._COPYING를
나는 스칼라를 사용하여 불꽃 스트리밍 함께 일하고 있어요. 나는이 라인 HDFS 디렉토리에서 dinamically .csv 파일을 읽을 필요가 :
val lines = ssc.textFileStream("/user/root/")
나는 HDFS에 파일을 넣어 다음 명령 줄을 사용하십시오
hdfs dfs -put ./head40k.csv
그것은 상대적으로 작은 파일과 함께 작동합니다. 나는 더 큰 하나를하려고하면이 오류를 얻을 :
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/head800k.csv._COPYING
그 이유를 이해할 수있다,하지만 난 그것을 해결하는 방법을 모르겠어요. 나도이 솔루션을 시도했다 :
hdfs dfs -put ./head800k.csv /user
hdfs dfs -mv /usr/head800k.csv /user/root
하지만 내 프로그램은 파일을 읽지 않습니다. 어떤 아이디어? 미리 감사드립니다
프로그램:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.HashMap
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._
object Traccia2014{
def main(args: Array[String]){
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <test><topicRisultato>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers,risultato) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.textFileStream("/user/root/")
//val lines= ssc.fileStream[LongWritable, Text, TextInputFormat](directory="/user/root/",
// filter = (path: org.apache.hadoop.fs.Path) => //(!path.getName.endsWith("._COPYING")),newFilesOnly = true)
//********** Definizioni Producer***********
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val slice=30
lines.foreachRDD( rdd => {
if(!rdd.isEmpty){
val min=rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
if(!min.isEmpty){
val ipDst= rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
if(!ipDst.isEmpty){
val ipSrc=rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(1)),1)).reduceByKey(_ + _)
if(!ipSrc.isEmpty){
val Rapporto=ipSrc.leftOuterJoin(ipDst).mapValues{case (x,y) => x.asInstanceOf[Int] / y.getOrElse(1) }
val RapportoFiltrato=Rapporto.filter{case (key, value) => value > 100 }
println("###(ConsumerScala) CalcoloRapporti: ###")
Rapporto.collect().foreach(println)
val str = Rapporto.collect().mkString("\n")
println(s"###(ConsumerScala) Produco Risultato : ${str}")
val message = new ProducerRecord[String, String](risultato, null, str)
producer.send(message)
Thread.sleep(1000)
}else{
println("src vuoto")
}
}else{
println("dst vuoto")
}
}else{
println("min vuoto")
}
}else
{
println("rdd vuoto")
}
})//foreach
ssc.start()
ssc.awaitTermination()
} }
해결법
-
==============================
1./user/root/head800k.csv._COPYING는 복사 프로세스가가는 동안 생성되는 임시 파일입니다. 완료 복사 프로세스 기다린 당신은이 _COPYING 접미사 즉 /user/root/head800k.csv없이 실패 할 것이다.
/user/root/head800k.csv._COPYING는 복사 프로세스가가는 동안 생성되는 임시 파일입니다. 완료 복사 프로세스 기다린 당신은이 _COPYING 접미사 즉 /user/root/head800k.csv없이 실패 할 것이다.
당신의 스파크 스트리밍 작업에서 이러한 과도를 필터링하는 데 여기에 설명 된 파일 스트림 방법을 사용할 수 있습니다 예를 들어 다음과 같이
ssc.fileStream[LongWritable, Text, TextInputFormat]( directory="/user/root/", filter = (path: org.apache.hadoop.fs.Path) => (!path.getName.endsWith("_COPYING")), // add other filters like files starting with dot etc newFilesOnly = true)
편집하다
당신은 HDFS에 로컬 파일 시스템에서 파일을 이동하기 때문에, 가장 좋은 방법은 HDFS에 임시 준비 위치로 파일을 이동 한 다음 대상 디렉토리로 이동하는 것입니다. 복사 또는 HDFS 파일 시스템 내에서 이동이 과도 파일을 피해야한다
from https://stackoverflow.com/questions/42041110/hdfs-java-io-filenotfoundexception-file-does-not-exist-name-copying by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하둡 FS 저것 집어 넣어 명령을 사용할 때 왜 맵리 듀스가 실행되지 않습니다? (0) | 2019.10.06 |
---|---|
[HADOOP] 아파치 하둡 일반적인 실패 하둡을 구축하는 동안 (0) | 2019.10.06 |
[HADOOP] 하이브에서 선택 쿼리를 실행하는 동안 오류 (0) | 2019.10.06 |
[HADOOP] hiveQL에서 선택 중첩 (0) | 2019.10.06 |
[HADOOP] MySQL의와 하이브 JDBC 연결 설정 또는 매핑 (0) | 2019.10.06 |