
[SCALA] 단일 스레드 구현보다는 Akka - 스트림 구현 속도가 느린


단일 스레드 구현보다는 Akka - 스트림 구현 속도가 느린

2015년 10월 30일에서 업데이트

롤랜드 쿤의 답변에 따라 :

나는 일부 변경했다. 내 코드는 지금과 같다 :

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
    import FlowGraph.Implicits._

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
    val mergeEvents = builder.add(Merge[Int](4))

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)

    (dispatchTuple.in, mergeEvents.out)

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
   // in.via(eventChef).runWith(sink)


나는 어떤 부문을 akka와 구현을 수행하여, 예를 들어 작업을 늘리면 : 나는 여전히 완전히 뭔가 잘못,하지만 내 구현을 얻을 확실 akka - 스트림하지 내가 훨씬 느린 (지금도 느린 이전과)하지만 내가 발견하는 것은입니다 -streams 빨리 가져옵니다. 내가 바로 그것을 얻을 경우 그래서 내 예제에서 너무 많은 오버 헤드가 보인다 (다른 날 수정). 당신은 단지에서 이익을 얻을 수 있도록 코드가 무거운 작업을 할 경우 akka을-스트림?

나는 스칼라 및 akka 스트림 모두에서 비교적 새로운 해요. 나는 카운터가 특정 번호에 도달 할 때까지 몇 가지 이벤트를 생성하는 작은 테스트 프로젝트를 썼다. 각각의 이벤트에 대한 이벤트의 한 필드의 요인이 계산되고있다. 나는이 두 번을 구현했습니다. akka 스트림과 함께 한 시간 akka 스트림 (단일 스레드) 및 비교 실행하지 않고 한 번.

나는 그것을 기대하지 않았다 : 나는 하나의 이벤트를 만들 때 두 프로그램의 실행은 거의 동일합니다. 내가 akka 스트림없이 70,000,000 이벤트 구현을 작성한다면 훨씬 빠릅니다. 여기 내 결과 (다음과 같은 데이터가 24 개 측정에 근거)가 있습니다 :

무슨 일이야 : 내 질문은 그래서? 왜 akka 스트림 느린 내 구현은?

내 코드에 여기 :

Akka와 구현

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)

  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink).onComplete(_ => endAkka())


Akka없이 구현

객체 SingleThread {

  def main(args: Array[String]) {
    println("SingleThread started at: " + SharedFunctions.startTime)
    val i = createEvent(0)
    val duration = new Duration(SharedFunctions.startTime, DateTime.now());
    println("Time: " + duration.getMillis + " || Data: " + i)

  def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
    if (count == SharedFunctions.maxEventCount) count
    else {
      val e = SharedFunctions.transform((randDate, name, age, myFloat))
      val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
        DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
      createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)

  def createEvent(count: Int): Int = {
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)


object SharedFunctions {
  val maxEventCount = 70000000
  val startTime = DateTime.now

  def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
  def transform2(e : Event) : Int = factorial(e.getAgeYrs)

  def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
  def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
    val cProgress = calculatePercentage(fileSize, currentSize)
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")

  private def factorialWorker(n1: Int, n2: Int): Int = {
    if (n1 == 0) n2
    else factorialWorker(n1 -1, n2*n1)
  def factorial (n : Int): Int = {
    factorialWorker(n, 1)

구현 이벤트

 * Autogenerated by Avro

public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public long timestampMS;
  @Deprecated public CharSequence name;
  @Deprecated public int ageYrs;
  @Deprecated public float sizeCm;

   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>. 
  public Event() {}

   * All-args constructor.
  public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
    this.timestampMS = timestampMS;
    this.name = name;
    this.ageYrs = ageYrs;
    this.sizeCm = sizeCm;

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call. 
  public Object get(int field$) {
    switch (field$) {
    case 0: return timestampMS;
    case 1: return name;
    case 2: return ageYrs;
    case 3: return sizeCm;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
  // Used by DatumReader.  Applications should not call. 
  public void put(int field$, Object value$) {
    switch (field$) {
    case 0: timestampMS = (Long)value$; break;
    case 1: name = (CharSequence)value$; break;
    case 2: ageYrs = (Integer)value$; break;
    case 3: sizeCm = (Float)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

   * Gets the value of the 'timestampMS' field.
  public Long getTimestampMS() {
    return timestampMS;

   * Sets the value of the 'timestampMS' field.
   * @param value the value to set.
  public void setTimestampMS(Long value) {
    this.timestampMS = value;

   * Gets the value of the 'name' field.
  public CharSequence getName() {
    return name;

   * Sets the value of the 'name' field.
   * @param value the value to set.
  public void setName(CharSequence value) {
    this.name = value;

   * Gets the value of the 'ageYrs' field.
  public Integer getAgeYrs() {
    return ageYrs;

   * Sets the value of the 'ageYrs' field.
   * @param value the value to set.
  public void setAgeYrs(Integer value) {
    this.ageYrs = value;

   * Gets the value of the 'sizeCm' field.
  public Float getSizeCm() {
    return sizeCm;

   * Sets the value of the 'sizeCm' field.
   * @param value the value to set.
  public void setSizeCm(Float value) {
    this.sizeCm = value;

  /** Creates a new Event RecordBuilder */
  public static Event.Builder newBuilder() {
    return new Event.Builder();

  /** Creates a new Event RecordBuilder by copying an existing Builder */
  public static Event.Builder newBuilder(Event.Builder other) {
    return new Event.Builder(other);

  /** Creates a new Event RecordBuilder by copying an existing Event instance */
  public static Event.Builder newBuilder(Event other) {
    return new Event.Builder(other);

   * RecordBuilder for Event instances.
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
    implements org.apache.avro.data.RecordBuilder<Event> {

    private long timestampMS;
    private CharSequence name;
    private int ageYrs;
    private float sizeCm;

    /** Creates a new Builder */
    private Builder() {

    /** Creates a Builder by copying an existing Builder */
    private Builder(Event.Builder other) {
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;

    /** Creates a Builder by copying an existing Event instance */
    private Builder(Event other) {
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;

    /** Gets the value of the 'timestampMS' field */
    public Long getTimestampMS() {
      return timestampMS;

    /** Sets the value of the 'timestampMS' field */
    public Event.Builder setTimestampMS(long value) {
      validate(fields()[0], value);
      this.timestampMS = value;
      fieldSetFlags()[0] = true;
      return this; 

    /** Checks whether the 'timestampMS' field has been set */
    public boolean hasTimestampMS() {
      return fieldSetFlags()[0];

    /** Clears the value of the 'timestampMS' field */
    public Event.Builder clearTimestampMS() {
      fieldSetFlags()[0] = false;
      return this;

    /** Gets the value of the 'name' field */
    public CharSequence getName() {
      return name;

    /** Sets the value of the 'name' field */
    public Event.Builder setName(CharSequence value) {
      validate(fields()[1], value);
      this.name = value;
      fieldSetFlags()[1] = true;
      return this; 

    /** Checks whether the 'name' field has been set */
    public boolean hasName() {
      return fieldSetFlags()[1];

    /** Clears the value of the 'name' field */
    public Event.Builder clearName() {
      name = null;
      fieldSetFlags()[1] = false;
      return this;

    /** Gets the value of the 'ageYrs' field */
    public Integer getAgeYrs() {
      return ageYrs;

    /** Sets the value of the 'ageYrs' field */
    public Event.Builder setAgeYrs(int value) {
      validate(fields()[2], value);
      this.ageYrs = value;
      fieldSetFlags()[2] = true;
      return this; 

    /** Checks whether the 'ageYrs' field has been set */
    public boolean hasAgeYrs() {
      return fieldSetFlags()[2];

    /** Clears the value of the 'ageYrs' field */
    public Event.Builder clearAgeYrs() {
      fieldSetFlags()[2] = false;
      return this;

    /** Gets the value of the 'sizeCm' field */
    public Float getSizeCm() {
      return sizeCm;

    /** Sets the value of the 'sizeCm' field */
    public Event.Builder setSizeCm(float value) {
      validate(fields()[3], value);
      this.sizeCm = value;
      fieldSetFlags()[3] = true;
      return this; 

    /** Checks whether the 'sizeCm' field has been set */
    public boolean hasSizeCm() {
      return fieldSetFlags()[3];

    /** Clears the value of the 'sizeCm' field */
    public Event.Builder clearSizeCm() {
      fieldSetFlags()[3] = false;
      return this;

    public Event build() {
      try {
        Event record = new Event();
        record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
        record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
        record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
        record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);


  1. ==============================

    1.나는 완전히 동의 롤랜드의 설명에 더하여, akka 스트림 단지 동시 프로그래밍 프레임 워크 아닌 것으로 이해되어야한다. 또한 싱크대에서이를 처리 할 필요가있을 때 수단 이벤트는 소스에 의해 생성되는 배압을 제공하는 스트리밍합니다. 요청이 통신은 각각의 공정 단계에서의 오버 헤드를 추가한다.

    따라서 귀하의 단일 스레드 및 멀티 스레드 비교하지 "사과 - 투 - 사과"입니다.

    당신이 선물 한 후 원시 멀티 스레드 실행 성능을 원하는 경우 / 배우 갈 수있는 더 좋은 방법입니다.

  2. ==============================

    2.Akka 스트림은 스트림의 처리 단계를 구현하는 액터 사이를 지나는 비동기 메시지를 사용한다. 비동기 경계를 통해 데이터를 전달하면 당신이 여기에서보고되는 오버 헤드를 가지고 : 당신의 계산은 스트리밍 솔루션은 메시지 전달에 의해 지배된다 요소 당 1μs의 대략 걸리는 동안 (단일 스레드 측정에서 파생 된) 약 160ns을 보인다.

    Akka 스트림은 스트림의 처리 단계를 구현하는 액터 사이를 지나는 비동기 메시지를 사용한다. 비동기 경계를 통해 데이터를 전달하면 당신이 여기에서보고되는 오버 헤드를 가지고 : 당신의 계산은 스트리밍 솔루션은 메시지 전달에 의해 지배된다 요소 당 1μs의 대략 걸리는 동안 (단일 스레드 측정에서 파생 된) 약 160ns을 보인다.

    어떤 혜택이 원시적 인 단일 스레드 솔루션을 통해 예상 할 수 있도록 코드에서 모든 계산이 하나의 배우 (지도 단계)에서 순차적으로 실행 : 또 다른 오해는 "스트림"을 말하는 것은 병렬 처리를 의미한다는 것이다.

    Akka 스트림에 의해 제공되는 병렬 처리의 혜택을 위해 당신은 각 요소마다의 작업> 1μs의 수행도 문서를 볼 수 있음을 여러 가공 단계가 필요합니다.

  3. from https://stackoverflow.com/questions/33416891/akka-stream-implementation-slower-than-single-threaded-implementation by cc-by-sa and MIT license