Candles Provider
Provider that supplies data about candles for the selected trading instrument. The chart listens to candlesPublisher
, a flow of candle data updates, and reacts to changes in it by updating the displayed information accordingly.
Whenever there is a new candle data emitted by candlesPublisher
, the chart updates to reflect this new data, ensuring that the visual representation of the trading instrument's price movements is always current and accurate.
import Combine/// Protocol for receiving candle data and managing the state of the connection.////// Provider that supplies data about candles for the selected trading instrument. The chart listens to `candlesPublisher`,/// a flow of candle data updates, and reacts to changes in it by updating the displayed information accordingly./// Whenever there is a new candle data emitted by `candlesPublisher`, the chart updates to reflect this new data,/// ensuring that the visual representation of the trading instrument's price movements is always current and accurate.public protocol CandlesProvider {/// Publisher for receiving an array of `Candle` data.////// Provides the publisher of candle data for the requested symbol.var candlesPublisher: AnyPublisher<[Candle], Never> { get }/// Publisher for receiving the availability state of candle data.////// Emits `true` when data is available, `false` otherwise.var isDataAvailable: AnyPublisher<Bool, Never> { get }/// Publisher for receiving the loading state of candle data after parameters change.////// Emits `true` when candle data is being loaded, `false` when loading is complete.var isLoading: AnyPublisher<Bool, Never> { get }/// Publisher for receiving the current connection state.////// Tracks the state of the connection to the candle data provider.var connectionState: AnyPublisher<DXChart.ConnectionState, Never> { get }/// Updates the parameters for receiving candle data.////// - Parameters:/// - symbol: The instrument symbol for which the candle data is requested./// - priceType: The type of price to use for the instrument (bid, ask, last, market)./// - aggregation: The aggregation settings for the candle data (time interval or granularity)./// - extendedHours: Flag indicating whether to include extended trading hours in the data./// - alignSessionStart: Flag indicating whether to align data to the start of the trading session.func changeParams(symbol: String,priceType: PriceType,aggregation: Aggregation,extendedHours: Bool,alignSessionStart: Bool)}
Candle data
/// Represents a financial market candle (OHLCV data) for a specific time period.////// The `Candle` structure holds data related to a specific time period, commonly used in financial charts to represent price movement./// It includes open, high, low, and close prices, along with volume, timestamp, and optional VWAP (volume-weighted average price).////// - Commonly used in candlestick charts to show price movement within a given time interval.public struct Candle: Codable, Hashable {/// The closing price of the asset for the specific time period.public var close: Double/// The highest price reached during the specific time period.public var hi: Double/// The lowest price reached during the specific time period.public var lo: Double/// The opening price of the asset for the specific time period.public var open: Double/// The timestamp (in milliseconds since epoch) representing the start of the time period.////// This timestamp is often used to place the candle in the correct position in a time-based chart.public var timestamp: Int64/// The trading volume (number of units traded) during the specific time period.public var volume: Double/// The volume-weighted average price (VWAP) for the specific time period.////// VWAP is an optional value that represents the average price weighted by volume. It helps indicate the true average price over the time period.public var vwap: Double?}
Default implementation
If you haven't implemented a Candles data provider earlier and are using DXFeedFramework
to retrieve data, you can use our default provider. To work with it and DXFeedFramework, you need a Quote Address to obtain candles and quotes data. Please contact your DXFeed sales manager or visit https://dxfeed.com/contact-sales/ for assistance.
Default implementation code:
import Combineimport DXChartimport DXFeedFrameworkfinal class DXFeedCandlesProvider {private var _candlesPublisher = PassthroughSubject<[DXChart.Candle], Never>()private var _isDataAvailable = PassthroughSubject<Bool, Never>()private var _isLoading = PassthroughSubject<Bool, Never>()private var _connectionState = PassthroughSubject<DXChart.ConnectionState, Never>()private let address: Stringprivate var symbol: String = ""private var priceType: DXChart.PriceTypeprivate var aggregation: DXChart.Aggregationprivate var extendedHours: Boolprivate var alignDataWithSessionStart: Bool/// Start Date store value in milliseconds from `Wed, 31 Dec 2014 20:00:00 GMT`private let startDate: Int64 = Date(timeIntervalSince1970: 1420056000).millisecondsSince1970private var timeSymbol: TimeSeriesSubscriptionSymbol {let aggregation = aggregationlet priceType = priceType == .last ? "," : ",price=(priceType.asString),"let symbolStr = "(symbol){=(aggregation.forConnection)(priceType)tho=(!extendedHours),(alignDataWithSessionStart ? "a=s" : "")}"let timeSeriesSubscriptionSymbol = TimeSeriesSubscriptionSymbol(symbol: symbolStr, fromTime: startDate)return timeSeriesSubscriptionSymbol}private var debouncingTimer: Timer? = nilprivate let debouncingTime: TimeInterval = 12private var receivedEvent = falseprivate var candleSubscription: DXFeedSubscription?private var endpoint: DXEndpoint?private let snapshotProcessor = SnapshotProcessor()init(address: String, symbol: String, priceType: DXChart.PriceType, aggregation: DXChart.Aggregation, extendedHours: Bool, alignDataWithSessionStart: Bool) {self.address = addressself.symbol = symbolself.priceType = priceTypeself.aggregation = aggregationself.extendedHours = extendedHoursself.alignDataWithSessionStart = alignDataWithSessionStartsnapshotProcessor.add(self)connect()}convenience init(address: String, dataStorage: DXChart.DataStorage, settingsManager: DXChart.SettingsManager) {self.init(address: address,symbol: dataStorage.instrument.symbol,priceType: settingsManager.priceType,aggregation: dataStorage.aggregation,extendedHours: settingsManager.extendedHours,alignDataWithSessionStart: settingsManager.alignDataWithSessionStart)}}extension DXFeedCandlesProvider: DXChart.CandlesProvider {var candlesPublisher: AnyPublisher<[DXChart.Candle], Never> {_candlesPublisher.eraseToAnyPublisher()}var isDataAvailable: AnyPublisher<Bool, Never> {_isDataAvailable.eraseToAnyPublisher()}var isLoading: AnyPublisher<Bool, Never> {_isLoading.eraseToAnyPublisher()}var connectionState: AnyPublisher<DXChart.ConnectionState, Never> {_connectionState.eraseToAnyPublisher()}func changeParams(symbol: String, priceType: DXChart.PriceType, aggregation: DXChart.Aggregation, extendedHours: Bool, alignSessionStart: Bool) {self.symbol = symbolself.priceType = priceTypeself.aggregation = aggregationself.extendedHours = extendedHoursself.alignDataWithSessionStart = alignSessionStartreconnect()}}private extension DXFeedCandlesProvider {func connect() {receivedEvent = falsecandleSubscription = nil_isLoading.send(true)try? SystemProperty.setProperty(DXEndpoint.ExtraProperty.heartBeatTimeout.rawValue, "10s")if endpoint == nil {endpoint = try? DXEndpoint.builder().withRole(.feed).build()endpoint?.add(listener: self)}_ = try? endpoint?.connect(address)candleSubscription = try? endpoint?.getFeed()?.createSubscription(Candle.self)try? candleSubscription?.add(listener: snapshotProcessor)try? candleSubscription?.addSymbols(timeSymbol)checkData()}func disconnect() {candleSubscription?.remove(listener: snapshotProcessor)candleSubscription = nil}func reconnect() {disconnect()connect()}func checkData() {DispatchQueue.main.async {guard self.debouncingTimer == nil || self.debouncingTimer?.isValid == false else { return }self.debouncingTimer = Timer.scheduledTimer(withTimeInterval: self.debouncingTime,repeats: false) { [weak self] timer intimer.invalidate()guard let self else { return }self.debouncingTimer = nilguard !self.receivedEvent else { return }self._isDataAvailable.send(false)self._isLoading.send(false)}}}}extension DXFeedCandlesProvider: Hashable {static func == (lhs: DXFeedCandlesProvider, rhs: DXFeedCandlesProvider) -> Bool {return lhs === rhs || lhs.address == rhs.address}func hash(into hasher: inout Hasher) {hasher.combine(address)}}extension DXFeedCandlesProvider: SnapshotDelegate {func receiveEvents(_ events: [DXFeedFramework.MarketEvent], isSnapshot: Bool) {receivedEvent = true_candlesPublisher.send(events.convertToCandles())_isDataAvailable.send(true)_isLoading.send(false)debouncingTimer?.invalidate()debouncingTimer = nil}}private extension Array where Element == DXFeedFramework.MarketEvent {func convertToCandles() -> [DXChart.Candle] {guard let dxCandlesArray = self as? [DXFeedFramework.Candle] else { return [] }let candlesArray: [DXChart.Candle] = dxCandlesArray.compactMap { event inlet candle = event.candlereturn DXChart.Candle(close: candle.close,hi: candle.high,lo: candle.low,open: candle.open,timestamp: candle.time,volume: candle.volume.isNaN ? 0 : candle.volume,vwap: candle.vwap.isNaN ? nil : candle.vwap)}return candlesArray.reversed()}}extension DXFeedCandlesProvider: DXEndpointListener {func endpointDidChangeState(old: DXFeedFramework.DXEndpointState, new: DXFeedFramework.DXEndpointState) {let result: ConnectionState =switch (new) {case .notConnected: .notConnectedcase .connecting: .connectingcase .connected: .connectedcase .closed: .closed@unknown default: .notConnected}_connectionState.send(result)}}// String representation for DXFeed connectionextension Aggregation {var forConnection: String {switch timeUnit {case .second: "(value)s"case .minute: "(value)m"case .hour: "(value)h"case .day: "(value)d"case .week: "(value)w"case .month: "(value)mo"case .year: "(value)y"}}}extension DXChart.PriceType {var asString: String {switch self {case .market: "mark"case .bid: "bid"case .ask: "ask"case .last: "last"}}}