You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ProductExportRepository.scala 1.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package de.codingchallenge.repositories
  2. import akka.NotUsed
  3. import akka.actor.ActorSystem
  4. import akka.http.scaladsl.Http
  5. import akka.http.scaladsl.model._
  6. import akka.stream.scaladsl.{Flow, Source}
  7. import akka.stream.{ActorMaterializer, Materializer}
  8. import akka.util.ByteString
  9. import com.typesafe.scalalogging.LazyLogging
  10. import de.codingchallenge.csv.CsvOps._
  11. import de.codingchallenge.models.ProductExport
  12. import scala.concurrent.{ExecutionContext, Future}
  13. class ProductExportRepository(actorSystem: ActorSystem)(implicit ec: ExecutionContext) extends LazyLogging {
  14. private implicit val as: ActorSystem = actorSystem
  15. private implicit val mat: Materializer = ActorMaterializer()
  16. val headerLine = "produktId|name|beschreibung|preis|summeBestand"
  17. val baseUrl = "http://localhost:8080"
  18. def add(p: Source[ProductExport, NotUsed], articlesSize: Int): Future[HttpResponse] = {
  19. val sourceWithHeader = p.via(csvFlow)
  20. .prepend(Source.single(headerLine))
  21. .intersperse("\n")
  22. .map(ByteString.apply)
  23. /**
  24. * Setting charset to utf8 results in HTTP.406
  25. */
  26. Http()
  27. .singleRequest(
  28. HttpRequest(
  29. method = HttpMethods.PUT,
  30. uri = s"$baseUrl/products/$articlesSize",
  31. entity = HttpEntity(ContentType.WithMissingCharset(MediaTypes.`text/csv`), sourceWithHeader)
  32. ))
  33. .map { res =>
  34. logger.info(s"Server responded with $res")
  35. res
  36. }
  37. }
  38. private val csvFlow: Flow[ProductExport, String, NotUsed] =
  39. Flow.fromFunction { p =>
  40. logger.info(s"streaming export record $p")
  41. p.toCsvLine
  42. }
  43. }