Fix closeable streams to recreate streams for a key if the delegate was closed.

This commit is contained in:
Mark Harrah 2013-12-06 20:43:48 -05:00
parent 21a05978de
commit a677448a6c
1 changed files with 21 additions and 11 deletions

View File

@ -16,10 +16,10 @@ sealed trait TaskStreams[Key]
def default = outID
def outID = "out"
def errorID = "err"
def readText(key: Key, sid: String = default): BufferedReader
def readBinary(a: Key, sid: String = default): BufferedInputStream
final def readText(a: Key, sid: Option[String]): BufferedReader = readText(a, getID(sid))
final def readBinary(a: Key, sid: Option[String]): BufferedInputStream = readBinary(a, getID(sid))
@ -31,13 +31,14 @@ sealed trait TaskStreams[Key]
// default logger
final lazy val log: Logger = log(default)
def log(sid: String): Logger
private[this] def getID(s: Option[String]) = s getOrElse default
}
trait ManagedStreams[Key] extends TaskStreams[Key]
sealed trait ManagedStreams[Key] extends TaskStreams[Key]
{
def open()
def close()
def isClosed: Boolean
}
trait Streams[Key]
@ -59,24 +60,32 @@ object Streams
private[this] val streams = new collection.mutable.HashMap[Key,ManagedStreams[Key]]
def apply(key: Key): ManagedStreams[Key] =
synchronized { streams.getOrElseUpdate(key, delegate(key)) }
synchronized {
streams.get(key) match {
case Some(s) if !s.isClosed => s
case _ =>
val newS = delegate(key)
streams.put(key, newS)
newS
}
}
def close(): Unit =
synchronized { streams.values.foreach(_.close() ); streams.clear() }
}
def apply[Key](taskDirectory: Key => File, name: Key => String, mkLogger: (Key, PrintWriter) => Logger): Streams[Key] = new Streams[Key] {
def apply(a: Key): ManagedStreams[Key] = new ManagedStreams[Key] {
private[this] var opened: List[Closeable] = Nil
private[this] var closed = false
def readText(a: Key, sid: String = default): BufferedReader =
make(a, sid)(f => new BufferedReader(new InputStreamReader(new FileInputStream(f), IO.defaultCharset)) )
def readBinary(a: Key, sid: String = default): BufferedInputStream =
make(a, sid)(f => new BufferedInputStream(new FileInputStream(f)))
def text(sid: String = default): PrintWriter =
make(a, sid)(f => new PrintWriter(new DeferredWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), IO.defaultCharset)))) )
@ -102,7 +111,8 @@ object Streams
def key: Key = a
def open() {}
def isClosed: Boolean = synchronized { closed }
def close(): Unit = synchronized {
if(!closed)
{