Client
Building on the protocol module, this is a mpd client library using FS2.
The entry point is MpdClient
which provides ways to send commands
and receive responses.
A MpdClient
can be created like this:
import java.nio.channels.AsynchronousChannelGroup
// import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent._
// import java.util.concurrent._
import scala.concurrent.ExecutionContext
// import scala.concurrent.ExecutionContext
import cats.effect.IO
// import cats.effect.IO
import mpc4s.client._
// import mpc4s.client._
implicit val EC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool)
// EC: scala.concurrent.ExecutionContextExecutorService = scala.concurrent.impl.ExecutionContextImpl$$anon$4@7728bed
implicit val ACG = AsynchronousChannelGroup.withThreadPool(EC)
// ACG: java.nio.channels.AsynchronousChannelGroup = sun.nio.ch.EPollPort@c6ca1
val mpc = MpdClient[IO](Connect("localhost", 6600))
// mpc: mpc4s.client.MpdClient[cats.effect.IO] = mpc4s.client.MpdClient$$anon$1@27d8008a
MpdClient
allows to connect
to mpd returning a single element
Stream
of a MpdConnection
. The connection is closed, once the
Stream
terminates, so the connection must be accessed “inside” that
Stream, for example using flatMap
.
Additionally there are send
methods defined for conveniently
issueing commands.
Here is an example to send a simple command:
scala> val resp = mpc.send(Search(Filter.tags(Tag.Album -> "plays"), None, None), 5.seconds)
resp: fs2.Stream[cats.effect.IO,mpc4s.protocol.Response[mpc4s.protocol.answer.SongListAnswer]] = Stream(..)
scala> resp.compile.toVector.unsafeRunSync
res1: Vector[mpc4s.protocol.Response[mpc4s.protocol.answer.SongListAnswer]] = Vector(MpdResult(SongListAnswer(SongList(Vector(Song(Uri(Eike/classic/Ladislav Jelinek/Ladislav Jelinek plays Beethoven/01-Sonata C Major Op53 Waldstein Allegro con brio (L van Beethoven)-Ladislav Jelinek.flac),Some(2018-07-16T18:45:13Z),Some(Seconds(683)),Some(683.0),ListMap(Map(Title -> Sonata C Major Op53 Waldstein, Allegro con brio (L van Beethoven), Composer -> Ludwig van Beethoven, Album -> Ladislav Jelinek plays Beethoven, Track -> 1, Albumartist -> Ladislav Jelinek, Artist -> Ladislav Jelinek, Date -> 2011, Comment -> http://magnatune.com/artists/ladislav_jelinek, Genre -> Classical))), Song(Uri(Eike/classic/Ladislav Jelinek/Ladislav Jelinek plays Beethoven/02-Sonata C Major O...
In this example the codec for the response is chosen at compile
time. When the concrete command is not known at compile time, one can
use a runtime registry. Then a different send
method is used:
scala> val resp = mpc.send1(Search(Filter.tags(Tag.Album -> "plays"), None, None), 5.seconds)
resp: fs2.Stream[cats.effect.IO,mpc4s.protocol.Response[mpc4s.protocol.Answer]] = Stream(..)
scala> resp.compile.toVector.unsafeRunSync
res2: Vector[mpc4s.protocol.Response[mpc4s.protocol.Answer]] = Vector(MpdResult(SongListAnswer(SongList(Vector(Song(Uri(Eike/classic/Ladislav Jelinek/Ladislav Jelinek plays Beethoven/01-Sonata C Major Op53 Waldstein Allegro con brio (L van Beethoven)-Ladislav Jelinek.flac),Some(2018-07-16T18:45:13Z),Some(Seconds(683)),Some(683.0),ListMap(Map(Title -> Sonata C Major Op53 Waldstein, Allegro con brio (L van Beethoven), Composer -> Ludwig van Beethoven, Album -> Ladislav Jelinek plays Beethoven, Track -> 1, Albumartist -> Ladislav Jelinek, Artist -> Ladislav Jelinek, Date -> 2011, Comment -> http://magnatune.com/artists/ladislav_jelinek, Genre -> Classical))), Song(Uri(Eike/classic/Ladislav Jelinek/Ladislav Jelinek plays Beethoven/02-Sonata C Major Op53 Waldstein I...
In the first example, the result type was a concrete anwser type
(Response[SongListAnswer]
), because it could be found at compile
time. In the second example the result type is Response[Answer]
which must be casted to a concrete type first to be useful.
These commands open each a new connection to MPD and so the mpd
commands idle
/noidle
are not allowed. If you want to listen for
events, use the idle
method on MpdClient
.
MPD’s idle mode
When a connection is opened to MPD it expects a command in a certain time window. If that passes, the connection is closed by MPD due to a timeout.
To be notified by MPD events and to reuse a single connection more
efficiently, the idle
command can be used. Read more about this
here.
The idle
method on MpdClient
opens a new connection to MPD and
immediatly sends the idle
command, effectively disabling the server
timeout.
Here is an example. Note that it requires a running MPD to be useful:
import cats.effect.{Effect, IO}
import fs2._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent._
import mpc4s.protocol._
import mpc4s.protocol.commands._
import mpc4s.client._
implicit val EC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool)
implicit val ACG = AsynchronousChannelGroup.withThreadPool(EC)
val mpc = MpdClient[IO](Connect("localhost", 6600))
def everySecond[F[_]: Effect]: Stream[F, Long] = {
Stream.eval(Effect[F].delay(new AtomicLong(0))).
flatMap(counter => Stream.every(1.seconds).
filter(_ == true).
evalMap(_ => Effect[F].delay(counter.getAndIncrement)))
}
def print: Sink[IO, Response[Answer]] =
_.evalMap(ans => IO(println(s">>>> $ans")))
val connect = Connect("127.0.0.1", 6700)
val read = MpdClient[IO](connect).idle.
flatMap { m =>
val actions = everySecond[IO].
take(6).
evalMap({
case 0 => m.write(CommandOrList(Status))
case 1 => m.write(CommandOrList(Clear))
case 2 => m.write(CommandOrList(SearchAdd(Filter.tags(Tag.Composer -> "Beethoven"))))
case 3 => m.write(CommandOrList(Play(None)))
case 4 => m.write(CommandOrList(Status))
case _ => IO(())
})
actions.concurrently(m.read.to(print))
}
read.compile.drain //.unsafeRunSync // only works if mpd is running
In this example every second a command is send to mpd using a single connection. The same connection is used to read the responses. Both things happen concurrently. At first, the current status is fetched. Then the playlist is cleared and filled with all songs from the database that have a Composer tag including the value “Beethoven”. Then playback is started. And at last, the current status is fetched again.
This prints something like this:
>>>> MpdResult(StatusAnswer(90,false,false,Off,false,1,0,Stop,None,None,None,None,None,None,None,None,None,Some(0.0),None,Some(),Some()))
>>>> MpdResult(Empty)
>>>> MpdResult(IdleAnswer(Vector(ChangeEvent(Playlist))))
>>>> MpdResult(Empty)
>>>> MpdResult(IdleAnswer(Vector(ChangeEvent(Playlist))))
>>>> MpdResult(Empty)
>>>> MpdResult(IdleAnswer(Vector(ChangeEvent(Player))))
>>>> MpdResult(StatusAnswer(90,false,false,Off,false,3,254,Play,Some(0),Some(Id(1)),Some(1),Some(Id(2)),Some(Range(1,407)),Some(1.137),Some(406.8),Some(198),None,Some(0.0),Some(AudioFormat(44100,16,2)),Some(),Some()))
>>>> MpdResult(IdleAnswer(Vector(ChangeEvent(Player))))
The code above sent 5 commands, but 9 responses have been
recorded. The reason is that MPD sent events about state changes
through this connection – the purpose of the idle
command. The state
changes were caused by the commands themselves.
You can see a more realistic use case in the http module where this is used to back the websockets connection.
Dependencies
This module has the following dependencies: