Browse Source

refac: replace grou by with AccumulateWhileUnchanged

master
Christof Nolle 1 year ago
parent
commit
5831d8759c

+ 2
- 1
build.sbt View File

@@ -37,7 +37,8 @@ lazy val compileDependencies = {
"com.typesafe.akka" %% "akka-slf4j" % "2.5.12",
"com.typesafe.akka" %% "akka-http" % "10.1.3",
"com.typesafe.akka" %% "akka-actor" % "2.5.8",
"com.typesafe.akka" %% "akka-stream" % "2.5.8"
"com.typesafe.akka" %% "akka-stream" % "2.5.8",
"com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
)
}


+ 3
- 3
src/main/scala/de/codingchallenge/Routes.scala View File

@@ -44,9 +44,9 @@ class Routes(articleExportService: ArticleExportService,
val exportRoute: Route =
path("articles") {
get {
println("test")
complete(articleExportService.exportArticles())
complete(StatusCodes.Accepted)
onSuccess(articleExportService.exportArticles()) { _ =>
complete(StatusCodes.NoContent)
}
}
}


+ 2
- 1
src/main/scala/de/codingchallenge/repositories/ProductExportRepository.scala View File

@@ -6,12 +6,13 @@ import akka.http.scaladsl.model.HttpEntity.Chunked
import akka.http.scaladsl.model._
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import de.codingchallenge.csv.CsvOps._
import de.codingchallenge.models.ProductExport

import scala.concurrent.Future

class ProductExportRepository(actorSystem: ActorSystem) {
class ProductExportRepository(actorSystem: ActorSystem) extends LazyLogging {

val headerLine = "produktId|name|beschreibung|preis|summeBestand\n"


+ 6
- 6
src/main/scala/de/codingchallenge/services/ArticleExportService.scala View File

@@ -3,6 +3,7 @@ package de.codingchallenge.services
import akka.NotUsed
import akka.http.scaladsl.model.HttpResponse
import akka.stream.Materializer
import akka.stream.contrib.AccumulateWhileUnchanged
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.scalalogging.LazyLogging
import de.codingchallenge.models.{Article, ProductExport}
@@ -23,16 +24,15 @@ class ArticleExportService(
articleRepository
.getArticles(productsSize)
.filter(_.stock > 0)
.groupBy(1, _.productId)
.map(a => a -> a.stock)
.reduce[(Article, Int)] {
.map(a => (a, a.stock))
.via(new AccumulateWhileUnchanged[(Article, Int), String](_._1.productId))
.map(_.reduce[(Article, Int)] {
case ((a1, c1), (a2, c2)) if a1.price < a2.price => (a1, c1 + c2)
case ((a1, c1), (a2, c2)) if a1.price > a2.price => (a2, c1 + c2)
case ((a1, c1), (_, c2)) => (a1, c1 + c2)
}
.mergeSubstreams
})
.map { case (article, stockSum) =>
logger.debug(s"Reduced to article: $article and stockSum: $stockSum")
logger.info(s"Reduced to article: $article and stockSum: $stockSum")
ProductExport(article, stockSum) }
), productsSize)


Loading…
Cancel
Save