Scala Mongo example with akka-http and akka-stream

Page content

Introduction

In this tutorial we shall create and CRUD web application in Scala. We will use MongoDB Scala Driver to communicate to the mongo database. It is officially supported Scala driver for MongoDB. Along with that we shall create a Rest API using akka-http module and akka-streams.

Prerequisites

  • Scala version: 2.13.1
  • Database: Mongo:3.4.23-xenial

Other dependencies

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.26",
"com.typesafe.akka" %% "akka-http" % "10.1.11",
"com.typesafe.akka" %% "akka-stream" % "2.5.26",
"ch.rasc" % "bsoncodec" % "1.0.1",
"org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.11"
)
view raw build.sbt hosted with ❤ by GitHub

About the service/project

Application should be able to do CRUD operations on the in mongodb document. This application should also provide Rest apis to access and modify the data. So it is an employee management. We have employee details in a document called employee. We will create reset services to create, update and search the employee data.

Document Class

Employee document contains employee name and date of birth.

@Documented
case class Employee(_id: String, name: String, dateOfBirth: LocalDate)
view raw Employee.scala hosted with ❤ by GitHub

Mongo Configuration

Codec Registry

Mongo documents required the codec information for marshal and un-marshal the data. This codecs are already written and available here. This repository have codec implementations for most of the available types. So first create the the java codec as below -

private val javaCodecs = CodecRegistries.fromCodecs(
new LocalDateTimeDateCodec(),
new BigDecimalStringCodec())
private val registry: CodecRegistry = CodecRegistries.fromProviders(classOf[Employee])
view raw DbConfig.scala hosted with ❤ by GitHub

In the above code Employee is our mongo document class with the following details.

Document Class

Employee document contains employee name and date of birth.

@Documented
case class Employee(_id: String, name: String, dateOfBirth: LocalDate)
view raw Employee.scala hosted with ❤ by GitHub

Database Details

MongoClientSettings is the companion object to configure the mongodb settings.

private val javaCodecs = CodecRegistries.fromCodecs(
new LocalDateTimeDateCodec(),
new BigDecimalStringCodec())
val user: String = "root"
val password: Array[Char] = "example".toCharArray
val source: String = "admin"
private val credential: MongoCredential = createCredential(user, source, password)
val settings: MongoClientSettings = MongoClientSettings.builder()
.applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
.credential(credential)
.codecRegistry(fromRegistries(registry, javaCodecs, DEFAULT_CODEC_REGISTRY))
.build()
val client: MongoClient = MongoClient(settings)
val database: MongoDatabase = client.getDatabase("testdatabase")
view raw DbConfig.scala hosted with ❤ by GitHub

In the above snippet; A Mongo client settings object is created that contains the configuration details for the connecting mongo database.

Collection Details

Collection details can be obtained from the database representation we get the previous step

val employees: MongoCollection[Employee] = database.getCollection("employee")
view raw DbConfig.scala hosted with ❤ by GitHub

This above step required for every collection to get the data.

JSON Support

We need formatter to format the date type to and from JSON format. We used Spray JSON here for this. Spray JSON providing automatic to and from JSON marshalling/un-marshalling using an in-scope spray-json protocol.

trait JsonUtils extends SprayJsonSupport with DefaultJsonProtocol {
implicit object dateFormatter extends JsonFormat[LocalDate] {
override def write(obj: LocalDate): JsValue = {
JsString(obj.toString)
}
override def read(json: JsValue): LocalDate = {
LocalDate.parse(json.toString(), DateTimeFormatter.ISO_DATE)
}
}
implicit val employeeJsonFormatter: RootJsonFormat[Employee] = DefaultJsonProtocol.jsonFormat3(Employee)
implicit val employeeRequestFormat: RootJsonFormat[EmployeeRequest] = jsonFormat2(EmployeeRequest)
}
view raw JsonUtils.scala hosted with ❤ by GitHub

Repository

Repository contains the create, update, delete and search commands for the employee document.

def insertData(emp: Employee): Future[Completed] = {
employeeDoc.insertOne(emp).toFuture()
}
def findAll(): Future[Seq[Employee]] = {
employeeDoc.find().toFuture()
}
def update(emp: Employee, id: String):Future[Employee] = {
employeeDoc
.findOneAndUpdate(equal("_id", id),
setBsonValue(emp),
FindOneAndUpdateOptions().upsert(true)).toFuture()
}
def delete(id: String): Future[DeleteResult] = {
employeeDoc.deleteOne(equal("_id", id)).toFuture()
}
private def setBsonValue(emp:Employee): Bson = {
combine(
set("name", emp.name),
set("dateOfBirth",emp.dateOfBirth)
)
}

Actors to get the user data Employee actor received messages to perform different operation on the employee document. Actor shall call the actual service and then send the responds to the sender.

class EmployeeActor extends Actor with ActorLogging {
private val employeeService: EmployeeService = new EmployeeService()
override def receive: Receive = {
case SAVE(employee: EmployeeRequest) =>
log.info(s"received message Save with employee $employee")
sender ! employeeService.saveEmployeeData(employee)
case SEARCH_ALL =>
log.info(s"received message find all")
sender() ! employeeService.findAll
case UPDATE(emp,id) =>
log.info(s"received message find all")
sender() ! employeeService.update(emp,id)
case DELETE(id)=>
log.info(s"delete message received for the id: $id")
sender() ! employeeService.delete(id)
case _ =>
log.debug("Unhandled message!")
}
}

Routing Configuration

PathDetails
GET /api/employee/createTo create employee data
POST /api/employee/searchTo search employee data
PUT /api/employee/updateTo update employee data
DELETE /api/employee/deleteTo delete user datail

The routing code should look like this-

class EmployeeRouteConfig(implicit val system: ActorSystem) extends JsonUtils {
val employeeActor: ActorRef = system.actorOf(Props(new EmployeeActor()))
implicit val mat: ActorMaterializer = ActorMaterializer()
val getRoute: Route =
PathDirectives.pathPrefix("employee") {
concat(
path("create") {
post {
entity(as[EmployeeRequest]) { employee =>
val future = Patterns.ask(employeeActor, SAVE(employee), TimeUtils.timeoutMills)
Await.result(future, TimeUtils.atMostDuration)
RouteDirectives.complete(HttpEntity("Data saved successfully!"))
}
}
},
path("search") {
get {
val resultFuture = Patterns.ask(employeeActor, SEARCH_ALL, TimeUtils.timeoutMills)
val resultSource = Await.result(resultFuture, TimeUtils.atMostDuration).asInstanceOf[Source[Employee, NotUsed]]
val resultByteString = resultSource.map { it => ByteString.apply(it.toJson.toString.getBytes()) }
RouteDirectives.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, resultByteString))
}
},
path("update") {
put {
parameter("id") { id =>
entity(as[EmployeeRequest]) { employee =>
val future = Patterns.ask(employeeActor, UPDATE(employee, id), TimeUtils.timeoutMills)
Await.result(future, TimeUtils.atMostDuration)
RouteDirectives.complete(HttpEntity("Data updated saved successfully!"))
}
}
}
},
path("delete") {
delete {
parameter("id") { id =>
val resultFuture = Patterns.ask(employeeActor, DELETE(id), TimeUtils.timeoutMills)
Await.result(resultFuture, TimeUtils.atMostDuration)
RouteDirectives.complete(HttpEntity(s"Data updated saved successfully!"))
}
}
}
)
}
}

Web Server

This class is used to create http server and bind the endpoints with the http server. You can get more information in my previous blog.

Run Application

Start the mongodb docker container

docker-compose up -d

Compile the code using below command

sbt compile

Run the application

sbt run 

Now go to terminal, and run below httpie commands

Create Employee Data

create employee

Search Employee Data search employee

Update Employee Data

Check current data in mongodb

data in database

Update the data by id

update data by id

Search data after update

search data in database

Delete Employee data

Search employee data

search data in database

Delete employee data using id

delete data in database

Search data after delete

search data in database

Github links

Full code is available here to to explore and fork. Feel free to do whatever you want. ;)

Conclusion

We have seen here how to use akka http and streams to create rest services in scala. You can get more information here -