HADOOP아파치 빔 'HDFS에 대한 등록을 찾을 수 없습니다'
나는 스파크 주자 및 데이터를 원격 시스템에 저장되어있는 파이프 라인을 실행합니다. 다음 명령은 작업을 제출하는 데 사용되었습니다 :
./spark-submit --class org.apache.beam.examples.WordCount --master spark:// --deploy-mode cluster --supervise --executor-memory 2G --total-executor-cores 4 hdfs:// --runner=SparkRunner
그것은 다음과 같은 응답을 생성한다 :
Running Spark using the REST application submission protocol.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request to launch an application in spark://
17/06/12 14:44:49 INFO RestSubmissionClient: Submission successfully created as driver-20170612200920-0006. Polling submission state...
17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request for the status of submission driver-20170612200920-0006 in spark://
17/06/12 14:44:49 INFO RestSubmissionClient: State of driver driver-20170612200920-0006 is now RUNNING.
17/06/12 14:44:49 INFO RestSubmissionClient: Driver is running on worker worker-20170612193258- at
17/06/12 14:44:49 INFO RestSubmissionClient: Server responded with CreateSubmissionResponse:
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20170612200920-0006",
"serverSparkVersion" : "1.6.3",
"submissionId" : "driver-20170612200920-0006",
"success" : true
Howewever 작업이 stderror 다른 세부 사항과 함께 다음과 같은 예외를 표시와 함께 'RUNNING'상태에 갇혀있다 :
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.IllegalStateException: Unable to find registrar for hdfs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:204)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:294)
at org.apache.beam.examples.WordCount.main(WordCount.java:132)
... 6 more
다음 난 내 프로젝트에 사용되는 플러그인 및 종속성은 다음과 같습니다 :
<name>Apache Development Snapshot Repository</name>
<!-- <exclusions>
</exclusions> -->
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<!-- Add slf4j API frontend binding with JUL backend -->
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
<!-- The DirectRunner is needed for unit tests. -->
<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
fatjar는 HadoopFileSystemRegistrar가 포함되어 있습니다. 다음은 단어 수 클래스의 소스 코드입니다 :
package org.apache.beam.examples;
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.examples;
import java.util.Collections;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
//import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
* An example that counts words in Shakespeare and includes Beam best practices.
public class WordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics
.counter(ExtractWordsFn.class, "emptyLines");
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
// Split the line into words.
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
* A SimpleFunction that converts a Word and Count into a printable string.
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words
return wordCounts;
* Options supported by {@link WordCount}. Concept #4: Defining your own
* configuration options. Here, you can add your own arguments to be
* processed by the command-line parser, and specify default values for
* them. You can then access the options values in your pipeline code.
* Inherits standard configuration options.
public interface WordCountOptions extends HadoopFileSystemOptions {
* By default, this example reads from a public dataset containing the
* text of King Lear. Set this option to choose a different input file
* or glob.
@Description("Path of the file to read from")
String getInputFile();
void setInputFile(String value);
* Set this required option to specify where to write the output.
@Description("/home/ankit/kinglear_chandan.txt ")
String getOutput();
void setOutput(String value);
public static void main(String[] args) {
String[] args1 =new String[]{ "--hdfsConfiguration=[{\"fs.defaultFS\" : \"hdfs://\"}]","--runner=SparkRunner"};
WordCountOptions options = PipelineOptionsFactory
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
1.저도 같은 문제를 가지고 있었다. 이 락스 티켓 https://issues.apache.org/jira/projects/BEAM/issues/BEAM-2429을 살펴보고 HDFS 경로를 처리하기 위해 매개 변수 fs.defaultFS을 설정하십시오. 이 당신을 도울 것입니다 바랍니다.
2.당신이 / 허용 대답에 링크 된 바와 같이 설정 fs.DefaultFS을 가지고 있지만, 여전히 (나를 위해 사건이었다) 문제가 발생하는 경우, 근본 원인은 다를 수 있습니다.
그것은 즉 자바 ServiceLoader가 HadoopFileSystemRegistrar을 찾을 수없는 원인 일 수 있습니다. 이 경우, 당신은 당신의 실행 항아리가 조립되는 방식을 수정해야 할 수도 있습니다. 비슷한 질문이 대답은 (대신 HDFS의 gs와) 나에게 솔루션을 제공했다.
편집 : 나는 스파크하지만 FLINK을 사용하고 '실 항아리'와 직접 파이프 라인의 주요 클래스를 실행하고 있지 않다.
3.나는 빔 SDK를 - 자바 코어 꽤 중요한 패키지에 대한 종속성을 볼 수 없습니다. 아마도 그것은 이적으로 당겨지고있어?
메이븐 원형 시작하고 차이를 확인하기 위해 의존성과 비교할 수있는 좋은 장소가 될 수있다 - https://beam.apache.org/get-started/quickstart-java/ 단어를 사용하여 프로젝트를 생성하는 방법에 대한 정보를 가지고 원형을 계산합니다.
나는 것 :
바라건대 그 제안 중 하나가 도움이됩니다.
4.당신은 그것을 작동하게하기 위해 당신의 단어 수 예에서 다음을 수정해야
이건 내 치어 파일입니다
<properties> <beam.version>2.0.0</beam.version> <surefire-plugin.version>2.20</surefire-plugin.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>${beam.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> <version>${beam.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.2</version> <scope>runtime</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-flink_2.10</artifactId> <version>${beam.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.10</artifactId> <version>2.8.8</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <!-- Adds a dependency on the Beam Google Cloud Platform IO module. --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <!-- Dependencies below this line are specific dependencies needed by the examples code. --> <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> <version>1.22.0</version> <exclusions> <!-- Exclude an old version of guava that is being pulled in by a transitive dependency of google-api-client --> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-bigquery</artifactId> <version>v2-rev295-1.22.0</version> <exclusions> <!-- Exclude an old version of guava that is being pulled in by a transitive dependency of google-api-client --> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client</artifactId> <version>1.22.0</version> <exclusions> <!-- Exclude an old version of guava that is being pulled in by a transitive dependency of google-api-client --> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-pubsub</artifactId> <version>v1-rev10-1.22.0</version> <exclusions> <!-- Exclude an old version of guava that is being pulled in by a transitive dependency of google-api-client --> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency> <!-- Add slf4j API frontend binding with JUL backend --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.14</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> <version>1.7.14</version> <!-- When loaded at runtime this will wire up slf4j to the JUL backend --> <scope>runtime</scope> </dependency> <!-- Hamcrest and JUnit are required dependencies of PAssert, which is used in the main code of DebuggingWordCount example. --> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-common</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId> <version>${beam.version}</version> </dependency> <!-- The DirectRunner is needed for unit tests. --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-alpha2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>${surefire-plugin.version}</version> <configuration> <parallel>all</parallel> <threadCount>4</threadCount> <redirectTestOutputToFile>true</redirectTestOutputToFile> </configuration> <dependencies> <dependency> <groupId>org.apache.maven.surefire</groupId> <artifactId>surefire-junit47</artifactId> <version>${surefire-plugin.version}</version> </dependency> </dependencies> </plugin> <!-- Ensure that the Maven jar plugin runs before the Maven shade plugin by listing the plugin higher within the file. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> </plugin> <!-- Configures `mvn package` to produce a bundled jar ("fat jar") for runners that require this for job submission to a cluster. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/LICENSE</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <pluginManagement> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.4.0</version> <configuration> <cleanupDaemonThreads>false</cleanupDaemonThreads> </configuration> </plugin> </plugins> </pluginManagement> </build>
그건 내 문제를 해결했다. 아파치 빔에 대한 경험을 가진 사람으로부터 답변을보고 싶습니다. 그들은 엔진 / 소스 / 대상 / 변환에 추상화 한 추가 수준을 추가 할 것으로 예상 아파치 빔의 목적을 고려하여 디자인의 관점에서, 프레임 워크 자체가 소스 유형을 추론이 종류를 할 수 없어 조금 실망 자체에 대한 기본 인스턴스의.
나는이 도움을 바랍니다.
from https://stackoverflow.com/questions/44497662/apache-beamunable-to-find-registrar-for-hdfs by cc-by-sa and MIT license
