diff --git a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/Alpaca.kt b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/Alpaca.kt index ae6c6f48..07f09b09 100644 --- a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/Alpaca.kt +++ b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/Alpaca.kt @@ -19,6 +19,7 @@ package org.roboquant.alpaca import net.jacobpeterson.alpaca.AlpacaAPI import net.jacobpeterson.alpaca.model.util.apitype.MarketDataWebsocketSourceType import net.jacobpeterson.alpaca.model.util.apitype.TraderAPIEndpointType +import net.jacobpeterson.alpaca.openapi.marketdata.model.StockFeed import org.roboquant.common.Config import org.roboquant.common.Exchange @@ -48,6 +49,7 @@ data class AlpacaConfig( var secretKey: String = Config.getProperty("alpaca.secret.key", ""), var accountType: AccountType = AccountType.PAPER, var dataType: DataType = DataType.IEX, + var stockFeed: StockFeed = StockFeed.IEX, var extendedHours: Boolean = Config.getProperty("alpaca.extendedhours", false), ) diff --git a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaBroker.kt b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaBroker.kt index 291072e0..18b83d02 100644 --- a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaBroker.kt +++ b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaBroker.kt @@ -126,12 +126,13 @@ class AlpacaBroker( */ private fun syncOrders() { _account.orders.forEach { - val aOrderId = orderPlacer.get(it.order) - if (aOrderId != null) { + if (it.open) { + val aOrderId = it.order.id + logger.info { "open order id=$aOrderId" } val alpacaOrder = alpacaAPI.trader().orders().getOrderByOrderID(UUID.fromString(aOrderId), false) updateIAccountOrder(it.order, alpacaOrder) } else { - logger.warn("cannot find order ${it.order} in orderMap") + logger.warn("cannot find order=${it.order} in orderMap") } } } diff --git a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaHistoricFeed.kt b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaHistoricFeed.kt index 8836fcc5..bce08734 100644 --- a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaHistoricFeed.kt +++ b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaHistoricFeed.kt @@ -21,7 +21,6 @@ import net.jacobpeterson.alpaca.openapi.marketdata.api.StockApi import net.jacobpeterson.alpaca.openapi.marketdata.model.Sort import net.jacobpeterson.alpaca.openapi.marketdata.model.StockAdjustment import net.jacobpeterson.alpaca.openapi.marketdata.model.StockBar -import net.jacobpeterson.alpaca.openapi.marketdata.model.StockFeed import org.roboquant.common.Asset import org.roboquant.common.Logging import org.roboquant.common.TimeSpan @@ -41,7 +40,7 @@ class AlpacaHistoricFeed( configure: AlpacaConfig.() -> Unit = {} ) : HistoricPriceFeed() { - private val limit = 10_000L + // private val limit = 10_000L private val config = AlpacaConfig() private val stockData: StockApi private val alpacaAPI: AlpacaAPI @@ -69,7 +68,7 @@ class AlpacaHistoricFeed( var nextPageToken: String? = null do { val resp = stockData.stockQuotes( - symbols, start, end, limit, "", StockFeed.IEX, "USD", nextPageToken, Sort.ASC + symbols, start, end, null, "", config.stockFeed, "USD", nextPageToken, Sort.ASC ) for ((symbol, quotes) in resp.quotes) { val asset = Asset(symbol) @@ -100,7 +99,7 @@ class AlpacaHistoricFeed( var nextPageToken: String? = null do { val resp = stockData.stockTrades( - symbols, start, end, limit, "", StockFeed.IEX, "USD", nextPageToken, Sort.ASC + symbols, start, end, null, "", config.stockFeed, "USD", nextPageToken, Sort.ASC ) for ((symbol, trades) in resp.trades) { val asset = Asset(symbol) @@ -120,8 +119,8 @@ class AlpacaHistoricFeed( val asset = Asset(symbol) for (bar in bars) { val action = PriceBar(asset, bar.o, bar.h, bar.l, bar.c, bar.v.toDouble(), timeSpan) - val now = bar.t.toInstant() - add(now, action) + val time = bar.t.toInstant() + add(time, action) } } @@ -131,7 +130,8 @@ class AlpacaHistoricFeed( fun retrieveStockPriceBars( symbols: String, timeframe: Timeframe, - frequency: String = "1Day" + frequency: String = "1Day", + adjustment: StockAdjustment = StockAdjustment.ALL ) { val (start, end) = toOffset(timeframe) @@ -142,17 +142,16 @@ class AlpacaHistoricFeed( frequency, start, end, - limit, - StockAdjustment.ALL, - "", - StockFeed.IEX, + null, + adjustment, + null, + config.stockFeed, "USD", nextPageToken, Sort.ASC ) for ((symbol, bars) in resp.bars) { processBars(symbol, bars, null) - } nextPageToken = resp.nextPageToken } while (nextPageToken != null) diff --git a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaLiveFeed.kt b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaLiveFeed.kt index a076c9a3..2cba83ef 100644 --- a/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaLiveFeed.kt +++ b/roboquant-alpaca/src/main/kotlin/org/roboquant/alpaca/AlpacaLiveFeed.kt @@ -17,9 +17,14 @@ package org.roboquant.alpaca import net.jacobpeterson.alpaca.AlpacaAPI +import net.jacobpeterson.alpaca.model.websocket.marketdata.streams.crypto.model.bar.CryptoBarMessage +import net.jacobpeterson.alpaca.model.websocket.marketdata.streams.crypto.model.quote.CryptoQuoteMessage +import net.jacobpeterson.alpaca.model.websocket.marketdata.streams.crypto.model.trade.CryptoTradeMessage import net.jacobpeterson.alpaca.model.websocket.marketdata.streams.stock.model.bar.StockBarMessage import net.jacobpeterson.alpaca.model.websocket.marketdata.streams.stock.model.quote.StockQuoteMessage import net.jacobpeterson.alpaca.model.websocket.marketdata.streams.stock.model.trade.StockTradeMessage +import net.jacobpeterson.alpaca.websocket.marketdata.streams.crypto.CryptoMarketDataListenerAdapter +import net.jacobpeterson.alpaca.websocket.marketdata.streams.crypto.CryptoMarketDataWebsocketInterface import net.jacobpeterson.alpaca.websocket.marketdata.streams.stock.StockMarketDataListenerAdapter import net.jacobpeterson.alpaca.websocket.marketdata.streams.stock.StockMarketDataWebsocketInterface import org.roboquant.common.Asset @@ -67,8 +72,6 @@ class AlpacaLiveFeed( private val config = AlpacaConfig() private val alpacaAPI: AlpacaAPI private val logger = Logging.getLogger(AlpacaLiveFeed::class) - private val listener = createStockHandler() - init { config.configure() @@ -83,7 +86,7 @@ class AlpacaLiveFeed( */ private fun connect() { connectMarket(alpacaAPI.stockMarketDataStream()) - // connectMarket(alpacaAPI.cryptoMarketDataStream()) + connectCryptoMarket(alpacaAPI.cryptoMarketDataStream()) } /** @@ -98,7 +101,25 @@ class AlpacaLiveFeed( if (!connection.isValid) { throw ConfigurationException("couldn't establish $connection") } else { - connection.setListener(listener) + val stockListener = createStockHandler() + connection.setListener(stockListener) + } + } + + /** + * Connect to ta market data provider and start listening. This can be the stocks or crypto market data feeds. + */ + private fun connectCryptoMarket(connection: CryptoMarketDataWebsocketInterface) { + require(!connection.isConnected) { "already connected, disconnect first" } + val timeoutMillis: Long = 5_000 + connection.setAutomaticallyReconnect(true) + connection.connect() + connection.waitForAuthorization(timeoutMillis, TimeUnit.MILLISECONDS) + if (!connection.isValid) { + throw ConfigurationException("couldn't establish $connection") + } else { + val cryptoListener = createCryptoHandler() + connection.setListener(cryptoListener) } } @@ -125,12 +146,11 @@ class AlpacaLiveFeed( * Subscribe to stock market data based on the passed [symbols] and [type] */ fun subscribeStocks(vararg symbols: String, type: PriceActionType = PriceActionType.PRICE_BAR) { - // validateSymbols(symbols, availableStocksMap) - val s = symbols.toList() + val s = symbols.toSet() when (type) { - PriceActionType.TRADE -> alpacaAPI.stockMarketDataStream().tradeSubscriptions.addAll(s) - PriceActionType.QUOTE -> alpacaAPI.stockMarketDataStream().quoteSubscriptions.addAll(s) - PriceActionType.PRICE_BAR -> alpacaAPI.stockMarketDataStream().minuteBarSubscriptions.addAll(s) + PriceActionType.TRADE -> alpacaAPI.stockMarketDataStream().tradeSubscriptions= s + PriceActionType.QUOTE -> alpacaAPI.stockMarketDataStream().quoteSubscriptions= s + PriceActionType.PRICE_BAR -> alpacaAPI.stockMarketDataStream().minuteBarSubscriptions= s } } @@ -139,12 +159,12 @@ class AlpacaLiveFeed( */ @Suppress("unused") fun subscribeCrypto(vararg symbols: String, type: PriceActionType = PriceActionType.PRICE_BAR) { - // validateSymbols(symbols, availableCryptoMap) - val s = symbols.toList() + + val s = symbols.toSet() when (type) { - PriceActionType.TRADE -> alpacaAPI.cryptoMarketDataStream().tradeSubscriptions.addAll(s) - PriceActionType.QUOTE -> alpacaAPI.cryptoMarketDataStream().quoteSubscriptions.addAll(s) - PriceActionType.PRICE_BAR -> alpacaAPI.cryptoMarketDataStream().minuteBarSubscriptions.addAll(s) + PriceActionType.TRADE -> alpacaAPI.cryptoMarketDataStream().tradeSubscriptions=s + PriceActionType.QUOTE -> alpacaAPI.cryptoMarketDataStream().quoteSubscriptions=s + PriceActionType.PRICE_BAR -> alpacaAPI.cryptoMarketDataStream().minuteBarSubscriptions=s } } @@ -192,6 +212,47 @@ class AlpacaLiveFeed( } + } + } + + private fun createCryptoHandler(): CryptoMarketDataListenerAdapter { + return object : CryptoMarketDataListenerAdapter() { + + override fun onTrade(trade: CryptoTradeMessage) { + val asset = Asset(trade.symbol) + val item = TradePrice(asset, trade.price) + val time = trade.timestamp.toInstant() + send(time, item) + } + + override fun onQuote(quote: CryptoQuoteMessage) { + val asset = Asset(quote.symbol) + val item = PriceQuote( + asset, + quote.askPrice, + quote.askSize.toDouble(), + quote.bidPrice, + quote.bidSize.toDouble(), + ) + val time = quote.timestamp.toInstant() + send(time, item) + } + + override fun onMinuteBar(bar: CryptoBarMessage) { + val asset = Asset(bar.symbol) + val item = PriceBar( + asset, + bar.open, + bar.high, + bar.low, + bar.close, + bar.tradeCount.toDouble() + ) + val time = bar.timestamp.toInstant() + send(time, item) + } + + } } } diff --git a/roboquant-alpaca/src/test/kotlin/org/roboquant/samples/AlpacaSamples.kt b/roboquant-alpaca/src/test/kotlin/org/roboquant/samples/AlpacaSamples.kt index b3790bf7..f9c4ae96 100644 --- a/roboquant-alpaca/src/test/kotlin/org/roboquant/samples/AlpacaSamples.kt +++ b/roboquant-alpaca/src/test/kotlin/org/roboquant/samples/AlpacaSamples.kt @@ -119,18 +119,12 @@ internal class AlpacaSamples { @Test @Ignore internal fun alpacaHistoricFeed3() { - val feed = AlpacaHistoricFeed() // We get minute data - val tf = Timeframe.parse("2024-01-04", "2024-01-05") + val tf = Timeframe.parse("2016-01-01", "2024-05-05") feed.retrieveStockPriceBars("AAPL", timeframe = tf, "1Min") - val events = feed.toList() - - with(events) { - println("events=$size start=${first().time} last=${last().time} symbols=${feed.assets.symbols.toList()}") - } - + println(feed) } @Test @@ -141,7 +135,7 @@ internal class AlpacaSamples { } val order = MarketOrder(Asset("IBM"), Size.ONE) broker.place(listOf(order)) - Thread.sleep(5000) + Thread.sleep(10_000) val account = broker.sync() println(account.fullSummary()) } diff --git a/roboquant-charts/src/test/kotlin/org/roboquant/charts/TestData.kt b/roboquant-charts/src/test/kotlin/org/roboquant/charts/TestData.kt index 32404c5e..aa39853b 100644 --- a/roboquant-charts/src/test/kotlin/org/roboquant/charts/TestData.kt +++ b/roboquant-charts/src/test/kotlin/org/roboquant/charts/TestData.kt @@ -26,7 +26,7 @@ import org.roboquant.common.Size import org.roboquant.common.USD import org.roboquant.feeds.random.RandomWalkFeed import org.roboquant.feeds.util.HistoricTestFeed -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import org.roboquant.metrics.AccountMetric import org.roboquant.orders.MarketOrder import org.roboquant.strategies.EMAStrategy @@ -66,7 +66,7 @@ object TestData { val data by lazy { val feed = HistoricTestFeed(50..150) - val journal = MetricsJournal(AccountMetric()) + val journal = MemoryJournal(AccountMetric()) org.roboquant.run(feed, EMAStrategy(), journal) journal.getMetric("account.equity") } diff --git a/roboquant-questdb/src/main/kotlin/org/roboquant/questdb/QuestDBJournal.kt b/roboquant-questdb/src/main/kotlin/org/roboquant/questdb/QuestDBJournal.kt index 08fd785d..f4e81f18 100644 --- a/roboquant-questdb/src/main/kotlin/org/roboquant/questdb/QuestDBJournal.kt +++ b/roboquant-questdb/src/main/kotlin/org/roboquant/questdb/QuestDBJournal.kt @@ -28,7 +28,7 @@ import org.roboquant.common.Logging import org.roboquant.common.Observation import org.roboquant.common.TimeSeries import org.roboquant.feeds.Event -import org.roboquant.journals.Journal +import org.roboquant.journals.MetricsJournal import org.roboquant.metrics.Metric import org.roboquant.orders.Order import org.roboquant.strategies.Signal @@ -49,7 +49,7 @@ class QuestDBJournal( workers: Int = 1, private val partition: String = QuestDBRecorder.NONE, private val truncate: Boolean = false -) : Journal { +) : MetricsJournal { private val logger = Logging.getLogger(this::class) private var engine: CairoEngine @@ -60,19 +60,34 @@ class QuestDBJournal( engine = getEngine(dbPath) ctx = SqlExecutionContextImpl(engine, workers) createTable(table) + logger.info { "db=$dbPath table=$table" } } companion object { + private var engines = mutableMapOf() + + @Synchronized fun getEngine(dbPath: Path): CairoEngine { - if (Files.notExists(dbPath)) { - Files.createDirectories(dbPath) + if (dbPath !in engines) { + if (Files.notExists(dbPath)) { + Files.createDirectories(dbPath) + } + require(dbPath.isDirectory()) { "dbPath needs to be a directory" } + val config = DefaultCairoConfiguration(dbPath.toString()) + engines[dbPath] = CairoEngine(config) } - require(dbPath.isDirectory()) { "dbPath needs to be a directory" } - val config = DefaultCairoConfiguration(dbPath.toString()) - val engine = CairoEngine(config) - return engine + return engines.getValue(dbPath) + } + + fun getRuns(dbPath: Path): Set { + val engine = getEngine(dbPath) + return engine.tables().toSet() + } + + fun close(dbPath: Path) { + engines[dbPath]?.close() } } @@ -88,10 +103,10 @@ class QuestDBJournal( /** * Get a metric for a specific [table] */ - fun getMetric(metricName: String): TimeSeries { + override fun getMetric(name: String): TimeSeries { val result = mutableListOf() - engine.query("select time, value from '$table' where metric='$metricName'") { + engine.query("select time, value from '$table' where metric='$name'") { while (hasNext()) { val r = this.record val o = Observation(ofEpochMicro(r.getTimestamp(0)), r.getDouble(1)) @@ -112,8 +127,8 @@ class QuestDBJournal( } - fun getMetricNames(run: String): Set { - return engine.distictSymbol(run, "name").toSortedSet() + override fun getMetricNames(): Set { + return engine.distictSymbol(table, "name").toSortedSet() } /** @@ -142,10 +157,10 @@ class QuestDBJournal( } /** - * Close the engine and context + * Close the underlying context */ fun close() { - engine.close() + // engine.close() ctx.close() } diff --git a/roboquant-questdb/src/test/kotlin/org/roboquant/questdb/QuestDBJournalTestIT.kt b/roboquant-questdb/src/test/kotlin/org/roboquant/questdb/QuestDBJournalTestIT.kt index ad10af30..6014059b 100644 --- a/roboquant-questdb/src/test/kotlin/org/roboquant/questdb/QuestDBJournalTestIT.kt +++ b/roboquant-questdb/src/test/kotlin/org/roboquant/questdb/QuestDBJournalTestIT.kt @@ -16,13 +16,18 @@ package org.roboquant.questdb +import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.io.TempDir +import org.roboquant.common.ParallelJobs +import org.roboquant.common.years import org.roboquant.feeds.random.RandomWalkFeed import org.roboquant.journals.Journal +import org.roboquant.journals.MultiRunJournal import org.roboquant.metrics.AccountMetric import org.roboquant.strategies.EMAStrategy import java.io.File import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue internal class QuestDBJournalTestIT { @@ -30,6 +35,8 @@ internal class QuestDBJournalTestIT { @TempDir lateinit var folder: File + @TempDir + lateinit var folder2: File private fun simpleRun(journal: Journal) { val feed = RandomWalkFeed.lastYears(1) @@ -47,4 +54,23 @@ internal class QuestDBJournalTestIT { logger.close() } + @Test + fun parallel() = runBlocking{ + val mrj = MultiRunJournal { + run -> QuestDBJournal(AccountMetric(), dbPath = folder2.toPath(), table=run) + } + val feed = RandomWalkFeed.lastYears(5) + val jobs = ParallelJobs() + val tfs = feed.timeframe.split(1.years) + for (tf in tfs) { + jobs.add { + val journal = mrj.getJournal() + org.roboquant.runAsync(feed, EMAStrategy(), journal, tf) + } + } + jobs.joinAll() + assertEquals(mrj.getRuns(), QuestDBJournal.getRuns(folder2.toPath())) + assertEquals(tfs.size, mrj.getMetric("account.equity").size) + } + } diff --git a/roboquant-questdb/src/test/kotlin/org/roboquant/samples/QuestDBSamples.kt b/roboquant-questdb/src/test/kotlin/org/roboquant/samples/QuestDBSamples.kt index a01852ad..c0835ee9 100644 --- a/roboquant-questdb/src/test/kotlin/org/roboquant/samples/QuestDBSamples.kt +++ b/roboquant-questdb/src/test/kotlin/org/roboquant/samples/QuestDBSamples.kt @@ -16,14 +16,17 @@ package org.roboquant.samples -import org.roboquant.common.Timeframe -import org.roboquant.common.months -import org.roboquant.common.seconds +import kotlinx.coroutines.runBlocking +import org.roboquant.common.* import org.roboquant.feeds.PriceBar import org.roboquant.feeds.filter import org.roboquant.feeds.random.RandomWalkFeed +import org.roboquant.journals.MultiRunJournal import org.roboquant.questdb.QuestDBFeed +import org.roboquant.questdb.QuestDBJournal import org.roboquant.questdb.QuestDBRecorder +import org.roboquant.runAsync +import org.roboquant.strategies.EMAStrategy import kotlin.system.measureTimeMillis import kotlin.test.Ignore import kotlin.test.Test @@ -40,6 +43,24 @@ internal class QuestDBSamples { } + @Test + @Ignore + internal fun parallel() = runBlocking { + val feed = RandomWalkFeed(Timeframe.past(4.years), nAssets = 3) + val jobs = ParallelJobs() + val mrj = MultiRunJournal { run -> QuestDBJournal(table = run) } + + for (tf in feed.timeframe.split(1.years)) { + jobs.add { + val acc = runAsync(feed, EMAStrategy(),mrj.getJournal(), timeframe = tf) + println(acc) + } + } + + jobs.joinAll() + println("done") + } + @Test @Ignore diff --git a/roboquant-server/src/main/kotlin/org/roboquant/server/RunInfo.kt b/roboquant-server/src/main/kotlin/org/roboquant/server/RunInfo.kt index 5c83dacb..8b9e6889 100644 --- a/roboquant-server/src/main/kotlin/org/roboquant/server/RunInfo.kt +++ b/roboquant-server/src/main/kotlin/org/roboquant/server/RunInfo.kt @@ -20,7 +20,7 @@ import org.roboquant.Roboquant import org.roboquant.common.TimeSpan import org.roboquant.common.Timeframe import org.roboquant.feeds.Feed -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal /** * Stored information about a single run @@ -28,7 +28,7 @@ import org.roboquant.journals.MetricsJournal internal data class RunInfo( val roboquant: Roboquant, val feed: Feed, - val journal: MetricsJournal, + val journal: MemoryJournal, val timeframe: Timeframe, val warmup: TimeSpan = TimeSpan.ZERO, var done: Boolean = false diff --git a/roboquant-server/src/main/kotlin/org/roboquant/server/WebServer.kt b/roboquant-server/src/main/kotlin/org/roboquant/server/WebServer.kt index 95a331ac..b543ded7 100644 --- a/roboquant-server/src/main/kotlin/org/roboquant/server/WebServer.kt +++ b/roboquant-server/src/main/kotlin/org/roboquant/server/WebServer.kt @@ -24,7 +24,7 @@ import org.roboquant.common.Config import org.roboquant.common.TimeSpan import org.roboquant.common.Timeframe import org.roboquant.feeds.Feed -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import java.util.concurrent.ConcurrentHashMap import kotlin.collections.set @@ -105,7 +105,7 @@ class WebServer(configure: WebServerConfig.() -> Unit = {}) { fun run( roboquant: Roboquant, feed: Feed, - journal: MetricsJournal = MetricsJournal(), + journal: MemoryJournal = MemoryJournal(), timeframe: Timeframe, name: String = getRunName(), warmup: TimeSpan = TimeSpan.ZERO, @@ -121,7 +121,7 @@ class WebServer(configure: WebServerConfig.() -> Unit = {}) { suspend fun runAsync( roboquant: Roboquant, feed: Feed, - journal: MetricsJournal = MetricsJournal(), + journal: MemoryJournal = MemoryJournal(), timeframe: Timeframe, name: String = getRunName(), warmup: TimeSpan = TimeSpan.ZERO, diff --git a/roboquant-server/src/test/kotlin/org/roboquant/samples/ServerSamples.kt b/roboquant-server/src/test/kotlin/org/roboquant/samples/ServerSamples.kt index c518354a..ba1efad0 100644 --- a/roboquant-server/src/test/kotlin/org/roboquant/samples/ServerSamples.kt +++ b/roboquant-server/src/test/kotlin/org/roboquant/samples/ServerSamples.kt @@ -19,7 +19,7 @@ package org.roboquant.samples import org.roboquant.Roboquant import org.roboquant.common.* import org.roboquant.feeds.random.RandomWalkLiveFeed -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import org.roboquant.metrics.PriceMetric import org.roboquant.server.WebServer import org.roboquant.strategies.EMAStrategy @@ -34,7 +34,7 @@ internal class ServerSamples { Roboquant(EMAStrategy()) private fun getJournal() = - MetricsJournal(PriceMetric("CLOSE")) + MemoryJournal(PriceMetric("CLOSE")) /** * You can run this sample to start a server with three runs diff --git a/roboquant/src/main/kotlin/org/roboquant/feeds/HistoricPriceFeed.kt b/roboquant/src/main/kotlin/org/roboquant/feeds/HistoricPriceFeed.kt index 13bfb97e..5fecb132 100644 --- a/roboquant/src/main/kotlin/org/roboquant/feeds/HistoricPriceFeed.kt +++ b/roboquant/src/main/kotlin/org/roboquant/feeds/HistoricPriceFeed.kt @@ -94,4 +94,11 @@ open class HistoricPriceFeed : HistoricFeed { for (event in feed.events) addAll(event.key, event.value) } + override fun toString(): String { + return if (events.isEmpty()) + "events=${events.size} assets=${assets.size}" + else + "events=${events.size} start=${events.firstKey()} end=${events.lastKey()} assets=${assets.size}" + } + } diff --git a/roboquant/src/main/kotlin/org/roboquant/journals/MetricsJournal.kt b/roboquant/src/main/kotlin/org/roboquant/journals/MemoryJournal.kt similarity index 69% rename from roboquant/src/main/kotlin/org/roboquant/journals/MetricsJournal.kt rename to roboquant/src/main/kotlin/org/roboquant/journals/MemoryJournal.kt index 68f4d504..8d1549c0 100644 --- a/roboquant/src/main/kotlin/org/roboquant/journals/MetricsJournal.kt +++ b/roboquant/src/main/kotlin/org/roboquant/journals/MemoryJournal.kt @@ -9,13 +9,29 @@ import org.roboquant.strategies.Signal import java.time.Instant import java.util.* +interface MetricsJournal: Journal { + + /** + * Return all the metrics that are contained in this journal + * @return Set + */ + fun getMetricNames() : Set + + /** + * Return a metric + * @param name String + * @return TimeSeries + */ + fun getMetric(name: String): TimeSeries +} + /** + * This journal stores the results of one or more metrics in memory * * @property metrics Array - * @property history MutableList * @constructor */ -class MetricsJournal(private vararg val metrics: Metric) : Journal { +class MemoryJournal(private vararg val metrics: Metric) : MetricsJournal { private val history = TreeMap>() @@ -32,7 +48,7 @@ class MetricsJournal(private vararg val metrics: Metric) : Journal { history.clear() } - fun getMetricNames() : Set { + override fun getMetricNames() : Set { return history.values.map { it.keys }.flatten().toSet() } @@ -41,7 +57,7 @@ class MetricsJournal(private vararg val metrics: Metric) : Journal { * @param name String * @return TimeSeries */ - fun getMetric(name: String): TimeSeries { + override fun getMetric(name: String): TimeSeries { val timeline = mutableListOf() val values = mutableListOf() for ( (t, d) in history) { diff --git a/roboquant/src/main/kotlin/org/roboquant/journals/MultiRunJournal.kt b/roboquant/src/main/kotlin/org/roboquant/journals/MultiRunJournal.kt index 1bbcdcac..804f518a 100644 --- a/roboquant/src/main/kotlin/org/roboquant/journals/MultiRunJournal.kt +++ b/roboquant/src/main/kotlin/org/roboquant/journals/MultiRunJournal.kt @@ -4,15 +4,20 @@ import org.roboquant.common.TimeSeries /** * - * @property fn Function1 - * @property journals MutableMap + * @property fn Function1 + * @property journals MutableMap * @constructor */ class MultiRunJournal(private val fn: (String) -> MetricsJournal) { private val journals = mutableMapOf() - fun getJournal(run: String): MetricsJournal { + companion object { + private var cnt = 0 + } + + @Synchronized + fun getJournal(run: String = "run-${cnt++}"): MetricsJournal { if (run !in journals) { val journal = fn(run) journals[run] = journal @@ -20,7 +25,19 @@ class MultiRunJournal(private val fn: (String) -> MetricsJournal) { return journals.getValue(run) } - fun getRuns() = journals.keys.toList() + /** + * Load existing runs + * @param runs List + */ + fun load(runs: List) { + for (run in runs) {getJournal(run)} + } + + /** + * Get the currently available runs + * @return Set + */ + fun getRuns() : Set = journals.keys fun getMetric(name: String) : Map { return journals.mapValues { it.value.getMetric(name) } diff --git a/roboquant/src/test/kotlin/org/roboquant/RoboquantTest.kt b/roboquant/src/test/kotlin/org/roboquant/RoboquantTest.kt index 0694f916..96f41d01 100644 --- a/roboquant/src/test/kotlin/org/roboquant/RoboquantTest.kt +++ b/roboquant/src/test/kotlin/org/roboquant/RoboquantTest.kt @@ -24,7 +24,7 @@ import org.roboquant.feeds.random.RandomWalkFeed import org.roboquant.feeds.util.HistoricTestFeed import org.roboquant.feeds.util.LiveTestFeed import org.roboquant.journals.BasicJournal -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import org.roboquant.journals.MultiRunJournal import org.roboquant.metrics.PNLMetric import org.roboquant.strategies.EMAStrategy @@ -124,7 +124,7 @@ internal class RoboquantTest { @Test fun run3() { - val mrj = MultiRunJournal { MetricsJournal(PNLMetric()) } + val mrj = MultiRunJournal { MemoryJournal(PNLMetric()) } val feed = TestData.feed val timeframes = feed.timeframe.split(1.years) for (tf in timeframes) { diff --git a/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt b/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt index 0b975763..df1d9b9f 100644 --- a/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt +++ b/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt @@ -19,7 +19,7 @@ package org.roboquant.feeds import kotlinx.coroutines.* import org.junit.jupiter.api.assertDoesNotThrow import org.roboquant.common.* -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import org.roboquant.metrics.ProgressMetric import org.roboquant.strategies.EMAStrategy import java.time.Instant @@ -86,7 +86,7 @@ internal class LiveFeedTest { tf.sample(200.millis, 10, resolution = ChronoUnit.MILLIS).forEach { jobs.add { - val j = MetricsJournal(ProgressMetric()) + val j = MemoryJournal(ProgressMetric()) org.roboquant.runAsync(feed, EMAStrategy(), j, it) val actions = j.getMetric("progress.actions").values.last() assertTrue(actions > 2) diff --git a/roboquant/src/test/kotlin/org/roboquant/journals/MultiRunJournalTest.kt b/roboquant/src/test/kotlin/org/roboquant/journals/MultiRunJournalTest.kt index 9d752a22..18b3f39e 100644 --- a/roboquant/src/test/kotlin/org/roboquant/journals/MultiRunJournalTest.kt +++ b/roboquant/src/test/kotlin/org/roboquant/journals/MultiRunJournalTest.kt @@ -15,7 +15,7 @@ internal class MultiRunJournalTest { fun basic() { val feed = RandomWalkFeed.lastYears(5) val mrj = MultiRunJournal { - MetricsJournal(AccountMetric()) + MemoryJournal(AccountMetric()) } val tfs = feed.timeframe.split(1.years) for (tf in tfs) { diff --git a/roboquant/src/test/kotlin/org/roboquant/metrics/AlphaBetaMetricTest.kt b/roboquant/src/test/kotlin/org/roboquant/metrics/AlphaBetaMetricTest.kt index 5f86dcba..0dc5e491 100644 --- a/roboquant/src/test/kotlin/org/roboquant/metrics/AlphaBetaMetricTest.kt +++ b/roboquant/src/test/kotlin/org/roboquant/metrics/AlphaBetaMetricTest.kt @@ -23,7 +23,7 @@ import org.roboquant.common.Currency import org.roboquant.common.Size import org.roboquant.feeds.random.RandomWalkFeed import org.roboquant.feeds.toList -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import org.roboquant.strategies.EMAStrategy import kotlin.test.Test import kotlin.test.assertContains @@ -36,7 +36,7 @@ internal class AlphaBetaMetricTest { val feed = TestData.feed val strategy = EMAStrategy.PERIODS_5_15 val alphaBetaMetric = AlphaBetaMetric(50) - val logger = MetricsJournal(alphaBetaMetric) + val logger = MemoryJournal(alphaBetaMetric) org.roboquant.run(feed, strategy,logger) val alpha = logger.getMetric("account.alpha").last().value diff --git a/roboquant/src/test/kotlin/org/roboquant/metrics/ReturnsMetricTest.kt b/roboquant/src/test/kotlin/org/roboquant/metrics/ReturnsMetricTest.kt index 2e371401..86559694 100644 --- a/roboquant/src/test/kotlin/org/roboquant/metrics/ReturnsMetricTest.kt +++ b/roboquant/src/test/kotlin/org/roboquant/metrics/ReturnsMetricTest.kt @@ -20,7 +20,7 @@ import org.roboquant.TestData import org.roboquant.common.months import org.roboquant.feeds.random.RandomWalkFeed import org.roboquant.feeds.util.HistoricTestFeed -import org.roboquant.journals.MetricsJournal +import org.roboquant.journals.MemoryJournal import org.roboquant.strategies.EMAStrategy import org.roboquant.strategies.TestStrategy import kotlin.test.Test @@ -41,7 +41,7 @@ internal class ReturnsMetricTest { fun basic2() { val metric = ReturnsMetric2(minSize = 250) val feed = RandomWalkFeed.lastYears(2) - val j = MetricsJournal(metric) + val j = MemoryJournal(metric) org.roboquant.run(feed, EMAStrategy(), j) assertContains(j.getMetricNames(), "returns.sharperatio") } @@ -51,7 +51,7 @@ internal class ReturnsMetricTest { val feed = HistoricTestFeed(100..300) val strategy = TestStrategy() val metric = ReturnsMetric(period = 1.months) - val logger = MetricsJournal(metric) + val logger = MemoryJournal(metric) org.roboquant.run(feed, strategy, logger) val sharpRatio = logger.getMetric("returns.sharperatio").last().value