Scala Mongo example with akka-http and akka-stream
Introduction
In this tutorial we shall create and CRUD web application in Scala. We will use MongoDB Scala Driver to communicate to the mongo database. It is officially supported Scala driver for MongoDB. Along with that we shall create a Rest API using akka-http module and akka-streams.
Prerequisites
- Scala version: 2.13.1
- Database: Mongo:3.4.23-xenial
Other dependencies
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % "2.5.26", | |
"com.typesafe.akka" %% "akka-http" % "10.1.11", | |
"com.typesafe.akka" %% "akka-stream" % "2.5.26", | |
"ch.rasc" % "bsoncodec" % "1.0.1", | |
"org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0", | |
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.11" | |
) |
About the service/project
Application should be able to do CRUD operations on the in mongodb document. This application should also provide Rest apis to access and modify the data. So it is an employee management. We have employee details in a document called employee
. We will create reset services to create, update and search the employee data.
Document Class
Employee document contains employee name and date of birth.
@Documented | |
case class Employee(_id: String, name: String, dateOfBirth: LocalDate) |
Mongo Configuration
Codec Registry
Mongo documents required the codec information for marshal and un-marshal the data. This codecs are already written and available here. This repository have codec implementations for most of the available types. So first create the the java codec as below -
private val javaCodecs = CodecRegistries.fromCodecs( | |
new LocalDateTimeDateCodec(), | |
new BigDecimalStringCodec()) | |
private val registry: CodecRegistry = CodecRegistries.fromProviders(classOf[Employee]) |
In the above code Employee
is our mongo document class with the following details.
Document Class
Employee document contains employee name and date of birth.
@Documented | |
case class Employee(_id: String, name: String, dateOfBirth: LocalDate) |
Database Details
MongoClientSettings
is the companion object to configure the mongodb settings.
private val javaCodecs = CodecRegistries.fromCodecs( | |
new LocalDateTimeDateCodec(), | |
new BigDecimalStringCodec()) | |
val user: String = "root" | |
val password: Array[Char] = "example".toCharArray | |
val source: String = "admin" | |
private val credential: MongoCredential = createCredential(user, source, password) | |
val settings: MongoClientSettings = MongoClientSettings.builder() | |
.applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava)) | |
.credential(credential) | |
.codecRegistry(fromRegistries(registry, javaCodecs, DEFAULT_CODEC_REGISTRY)) | |
.build() | |
val client: MongoClient = MongoClient(settings) | |
val database: MongoDatabase = client.getDatabase("testdatabase") |
In the above snippet; A Mongo client settings object is created that contains the configuration details for the connecting mongo database.
Collection Details
Collection details can be obtained from the database representation we get the previous step
val employees: MongoCollection[Employee] = database.getCollection("employee") |
This above step required for every collection to get the data.
JSON Support
We need formatter to format the date type to and from JSON format. We used Spray JSON here for this. Spray JSON providing automatic to and from JSON marshalling/un-marshalling using an in-scope spray-json protocol.
trait JsonUtils extends SprayJsonSupport with DefaultJsonProtocol { | |
implicit object dateFormatter extends JsonFormat[LocalDate] { | |
override def write(obj: LocalDate): JsValue = { | |
JsString(obj.toString) | |
} | |
override def read(json: JsValue): LocalDate = { | |
LocalDate.parse(json.toString(), DateTimeFormatter.ISO_DATE) | |
} | |
} | |
implicit val employeeJsonFormatter: RootJsonFormat[Employee] = DefaultJsonProtocol.jsonFormat3(Employee) | |
implicit val employeeRequestFormat: RootJsonFormat[EmployeeRequest] = jsonFormat2(EmployeeRequest) | |
} |
Repository
Repository contains the create, update, delete and search commands for the employee document.
def insertData(emp: Employee): Future[Completed] = { | |
employeeDoc.insertOne(emp).toFuture() | |
} | |
def findAll(): Future[Seq[Employee]] = { | |
employeeDoc.find().toFuture() | |
} | |
def update(emp: Employee, id: String):Future[Employee] = { | |
employeeDoc | |
.findOneAndUpdate(equal("_id", id), | |
setBsonValue(emp), | |
FindOneAndUpdateOptions().upsert(true)).toFuture() | |
} | |
def delete(id: String): Future[DeleteResult] = { | |
employeeDoc.deleteOne(equal("_id", id)).toFuture() | |
} | |
private def setBsonValue(emp:Employee): Bson = { | |
combine( | |
set("name", emp.name), | |
set("dateOfBirth",emp.dateOfBirth) | |
) | |
} |
Actors to get the user data Employee actor received messages to perform different operation on the employee document. Actor shall call the actual service and then send the responds to the sender.
class EmployeeActor extends Actor with ActorLogging { | |
private val employeeService: EmployeeService = new EmployeeService() | |
override def receive: Receive = { | |
case SAVE(employee: EmployeeRequest) => | |
log.info(s"received message Save with employee $employee") | |
sender ! employeeService.saveEmployeeData(employee) | |
case SEARCH_ALL => | |
log.info(s"received message find all") | |
sender() ! employeeService.findAll | |
case UPDATE(emp,id) => | |
log.info(s"received message find all") | |
sender() ! employeeService.update(emp,id) | |
case DELETE(id)=> | |
log.info(s"delete message received for the id: $id") | |
sender() ! employeeService.delete(id) | |
case _ => | |
log.debug("Unhandled message!") | |
} | |
} |
Routing Configuration
Path | Details |
---|---|
GET /api/employee/create | To create employee data |
POST /api/employee/search | To search employee data |
PUT /api/employee/update | To update employee data |
DELETE /api/employee/delete | To delete user datail |
The routing code should look like this-
class EmployeeRouteConfig(implicit val system: ActorSystem) extends JsonUtils { | |
val employeeActor: ActorRef = system.actorOf(Props(new EmployeeActor())) | |
implicit val mat: ActorMaterializer = ActorMaterializer() | |
val getRoute: Route = | |
PathDirectives.pathPrefix("employee") { | |
concat( | |
path("create") { | |
post { | |
entity(as[EmployeeRequest]) { employee => | |
val future = Patterns.ask(employeeActor, SAVE(employee), TimeUtils.timeoutMills) | |
Await.result(future, TimeUtils.atMostDuration) | |
RouteDirectives.complete(HttpEntity("Data saved successfully!")) | |
} | |
} | |
}, | |
path("search") { | |
get { | |
val resultFuture = Patterns.ask(employeeActor, SEARCH_ALL, TimeUtils.timeoutMills) | |
val resultSource = Await.result(resultFuture, TimeUtils.atMostDuration).asInstanceOf[Source[Employee, NotUsed]] | |
val resultByteString = resultSource.map { it => ByteString.apply(it.toJson.toString.getBytes()) } | |
RouteDirectives.complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, resultByteString)) | |
} | |
}, | |
path("update") { | |
put { | |
parameter("id") { id => | |
entity(as[EmployeeRequest]) { employee => | |
val future = Patterns.ask(employeeActor, UPDATE(employee, id), TimeUtils.timeoutMills) | |
Await.result(future, TimeUtils.atMostDuration) | |
RouteDirectives.complete(HttpEntity("Data updated saved successfully!")) | |
} | |
} | |
} | |
}, | |
path("delete") { | |
delete { | |
parameter("id") { id => | |
val resultFuture = Patterns.ask(employeeActor, DELETE(id), TimeUtils.timeoutMills) | |
Await.result(resultFuture, TimeUtils.atMostDuration) | |
RouteDirectives.complete(HttpEntity(s"Data updated saved successfully!")) | |
} | |
} | |
} | |
) | |
} | |
} |
Web Server
This class is used to create http server and bind the endpoints with the http server. You can get more information in my previous blog.
Run Application
Start the mongodb docker container
docker-compose up -d
Compile the code using below command
sbt compile
Run the application
sbt run
Now go to terminal, and run below httpie commands
Create Employee Data
Search Employee Data
Update Employee Data
Check current data in mongodb
Update the data by id
Search data after update
Delete Employee data
Search employee data
Delete employee data using id
Search data after delete
Github links
Full code is available here to to explore and fork. Feel free to do whatever you want. ;)
Conclusion
We have seen here how to use akka http and streams to create rest services in scala. You can get more information here -