You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ArticleExportService.scala 1.8KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package de.codingchallenge.services
  2. import akka.NotUsed
  3. import akka.http.scaladsl.model.HttpResponse
  4. import akka.stream.Materializer
  5. import akka.stream.contrib.AccumulateWhileUnchanged
  6. import akka.stream.scaladsl.{Sink, Source}
  7. import com.typesafe.scalalogging.LazyLogging
  8. import de.codingchallenge.models.{Article, ProductExport}
  9. import de.codingchallenge.repositories.{ArticleRepository, ProductExportRepository}
  10. import scala.concurrent.Future
  11. class ArticleExportService(
  12. articleRepository: ArticleRepository,
  13. productExportRepository: ProductExportRepository,
  14. mat: Materializer
  15. ) extends LazyLogging {
  16. implicit val m: Materializer = mat
  17. val productsSize: Int = 100
  18. /**
  19. * Streams articles to generate a product export.
  20. * Filters unavailable articles
  21. * accumulates articles until id change (groupBy does not close the substream and waits for more articles which match
  22. * the predicate)
  23. * Accumulated articles are combined (we might use a semigroup here) to a tuple of the article with the lowest price
  24. * and the sum of stock values)
  25. * Result is transformed to a 'ProductExport' which later on will be processed by the productExportRepository
  26. *
  27. * @return
  28. */
  29. def exportArticles(): Future[HttpResponse] = productExportRepository.add(Source.fromGraph[ProductExport, NotUsed](
  30. articleRepository
  31. .getArticles(productsSize)
  32. .filter(_.stock > 0)
  33. .map(a => (a, a.stock))
  34. .via(new AccumulateWhileUnchanged[(Article, Int), String](_._1.productId))
  35. .map(_.reduce[(Article, Int)] {
  36. case ((a1, c1), (a2, c2)) if a1.price > a2.price => (a2, c1 + c2)
  37. case ((a1, c1), (_, c2)) => (a1, c1 + c2)
  38. })
  39. .map { case (article, stockSum) =>
  40. logger.info(s"Reduced to article: $article and stockSum: $stockSum")
  41. ProductExport(article, stockSum) }
  42. ), productsSize)
  43. }