Analysing SignalR with Wireshark & Pharo

I’m trying to hook Pharo into the real-time order book feed from the Bittrex cryptocurrency exchange, which comes via the signalr protocol on top of a websocket connection.

Signalr is a Microsoft ASP.NET library designed to establish “persistent connections”. To use Signalr outside the Microsoft garden needs some protocol hacking from watching other implementations in action. I used python-bittrex-websocket and Pawel Kadluczka’s Informal Description of the Signalr Protocol was also a great help.

Note: As a general philosophy, I’m ignoring Signalr’s fallback to non-websocket protocols.

Connection Negotiate

As the first step in starting a signalr connection the client sends the server a negotiate request. Following on from my Pharo v. Cloudflare post I’ll presume you’ve done steps 1 to 6 from there. So start a new Wireshark capture, and once again do…

$ python order_book.py

Wireshark-signalr

Ignoring the first few packets that CloudflareUn will handle, the signalr negotiate request is seen at packet #26 with a response at packet #29. This part can be replicated in Pharo as… (btw, anyone unfamiliar with Pharo syntax should read the first three pages here)


cloudflareun := CloudflareUn knockUrl: 'http://bittrex.com'.
client := cloudflareun client.
client url: 'http://bittrex.com/signalr/negotiate'.
client queryAt: 'connectionData' put: '[{"name": "coreHub"}]'.
client queryAt: 'clientProtocol' put: '1.5'.
(response := client get) inspect. 

==>
{   "Url":"/signalr",
    "ConnectionToken":"MT55CnH4LlWNxjX0Q5w<...snip...>",
    "ConnectionId":"fa8d0fc5-b8d0-4925-bc63-7aa8984b1f4d",
    "KeepAliveTimeout":20.0,
    "DisconnectTimeout":30.0,
    "ConnectionTimeout":110.0,
    "TryWebSockets":true,
    "ProtocolVersion":"1.5",
    "TransportConnectTimeout":5.0,
    "LongPollDelay":0.0
}

The meaning of which is:

  • ConnectionToken – For each request, the client and server pass a connection token which contains the connection id and username for authenticated users.
  • ConnectionId – Each client has its own unique id that is randomly generated when the client connects to the server hub. It persists for the duration of the signalr session and can be used to reacquire broken connections.
  • ConnectionTimeout – Represents the amount of time to leave a connection open before timing out. Default is 110 seconds. This setting applies only when keepalive functionality is disabled, which normally applies only to the long polling transport, and so is not relevant to the current use case.
  • DisconnectTimeout – Represents the amount of time to wait after a connection goes away before raising the disconnect event. Default is 30 seconds.
  • KeepAlive – Representing the amount of time to wait before sending a keep alive packet over an idle connection. Set to null to disable keep alive. This is set to 30 seconds by default. When this is on, the ConnectionTimeout will have no effect.
  • TransportConnectTimeout – amount of time a client should allow to connect before falling back to another transport or failing. This is not relevant since we’ll not be falling back to non-websocket protocols.
  • LongPollDelay – is not relevant to websockets

Those timeouts are described more thoroughly at Understanding and Handling Connection Lifetime Events in SignalR. Interestingly, it says, “KeepAlive must not be more than 1/3 of the DisconnectTimeout value” but we see that assertion is not true for the Bittrex configuration. In any case, perhaps client–>server keep-alives should be sent initially at 5 second intervals [opinions anyone?].

Connection Connect

The next step in the Signalr protocol is seen in packet #36 where the client sends a connect request carrying forward parameters connectionData & clientProtocol
and adding new parameters connectionToken & transport. As well, it attempts to upgrade from transfer protocol to websockets. The response in packets #36 and #38 shows the successful upgrade to the websocket protocol…

HTTP REQUEST (#36)
GET /signalr/connect?connectionToken=ulMFV4z5JG%2BxfSpMX3A4%2BS%2FFay55rJr1Y[...truncated...]
    &connectionData=%5B%7B%22name%22%3A+%22coreHub%22%7D%5D
    &transport=webSockets
    &clientProtocol=1.5
Cookie: __cfduid=df17ba99a887664411404c9f88347504f1517897211;
  cf_clearance=097eaac668c0eb0db0a3cceb265e6ef7ea0a384c-1517897216-10800
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Key: YymChuidZ4RPOna5ZwD1EQ==
Sec-WebSocket-Version: 13

HTTP RESPONSE (#38)
HTTP/1.1 101 Switching Protocols

WEBSOCKET S-->C (#40)
{"C":"d-4F5DA126-B,0|F8TuM,0|F8TuN,1","S":1,"M":[]}

We’ll use NeoJSON to parse the connectionToken from the previous response


Gofer it
   smalltalkhubUser: 'SvenVanCaekenberghe' project: 'Neo';
   configurationOf: 'NeoJSON';
   loadStable.

(params := NeoJSONReader fromString: response) inspect.
connectionToken := params at: 'ConnectionToken'.
==>MT55CnH4LlWNxjX0Q5w<...snip...>

Looking under the hood of Pharo’s websocket implementation we see it adds the required headers for the protocol upgrade…


(ZnWebSocket class >> #webSocketClientTo:) inspect

[Source]tab==>
webSocketClientTo: url
	"Create and return a new ZnClient instance ready for the initial client side WebSocket setup request"
	| client |
	self assert: (#(ws wss) includes: url scheme).
	(client := ZnClient new)
		url: url;
		method: #GET;
		headerAt: 'Upgrade' put: 'websocket';
		headerAt: 'Connection' put: 'Upgrade';
		headerAt: 'Sec-WebSocket-Version' put: '13';
		headerAt: 'Sec-WebSocket-Key' put: ZnWebSocketUtils newClientKey.
	^ client

but this is not quite suitable for us since it creates a fresh ZnClient, while we need to use our cloudflareun preconfigured ZnClient. Since this is Pharo, we can easily tweak that #webSocketClientTo: into a new method…


ZnWebSocket class >> upgradeSocketClient: aZnClient
	"Make an existing ZnClient instance ready for the initial client side WebSocket setup request"
	aZnClient
		method: #GET;
		headerAt: 'Upgrade' put: 'websocket';
		headerAt: 'Connection' put: 'Upgrade';
		headerAt: 'Sec-WebSocket-Version' put: '13';
		headerAt: 'Sec-WebSocket-Key' put: ZnWebSocketUtils newClientKey.

and also tweak ZnWebSocket >> #to: urlObject into a new method that calls it…


ZnWebSocket class >> onHttpClient: client
	"Attempt to upgrade an existing http client client to a WebSocket.
	Do the initial upgrade handshake and return a functioning ZnWebSocket object.
	Signals a ZnWebSocketFailed error when unsuccessful."

	self upgradeSocketClient: client.
	client execute.
	(self isValidWebSocketResponse: client)
		ifTrue: [
			^ (self onStream: client connection)
				role: #client;
				yourself ]
		ifFalse: [
			client close.
			(ZnWebSocketFailed response: client response) signal ]

Now we are ready to build our connect request to match packet #36. Continuing in Playground…


client := cloudflareun client.
client url: 'http://bittrex.com/signalr/connect'.
client queryAt: 'connectionToken' put: connectionToken.
client queryAt: 'connectionData' put: '[{"name": "coreHub"}]'.
client queryAt: 'transport' put: 'webSockets'.
client queryAt: 'clientProtocol' put: '1.5'.
(websocket := ZnWebSocket onHttpClient: client) inspect.
[websocket runWith: [ :msg | self inform: crShow: msg printString]] forkAt: 35.

Watching Wireshark, we see the request, and the response is the hoped for “HTTP/1.1 101 Switching Protocols\r\n” just like packet #38… And also just like packet #40 we received a WebSocket packet containing…

WEBSOCKET S-->C
    {"C":"d-3D5EB89-B,0|Ek,0|El,2","S":1,"M":[]} 

Deciphered this: message ID “C” ; an “S” of 1 indicating the transport was initialized; and “M” empty of actual data.

Subsequently every 8 seconds or so we receive a new WebSocket packet with empty data…

WEBSOCKET S-->C
     {} 

until we close the connection in the Inspector that appeared…
InspectorOnWebSocket

Hub Start


To complete our Signalr handshake, over HTTP we need to send a start url similar to packet #42 and hope for a response like packet #48.

HTTP REQUEST (#42)
GET /signalr/start?connectionToken=ulMFV4z5JG%2BxfSpMX3A4%2BS%2FFay55rJr1Y[...truncated...]
    &connectionData=%5B%7B%22name%22%3A+%22coreHub%22%7D%5D
    &transport=webSockets
    &clientProtocol=1.5
Cookie: cf_clearance=097eaac668c0eb0db0a3cceb265e6ef7ea0a384c-1517897216-10800; __cfduid=df17ba99a887664411404c9f88347504f1517897211

HTTP RESPONSE (#48)
HTTP/1.1 200 OK   (application/json)
{ "Response": "started" }

Now I found I couldn’t reuse the previous client because that interfered with the websocket operation, since its underlying socket was taken by the websocket. But this is Pharo!! So its easy to modify the ZnClient system library! Adding the following method to it…


ZnClient >> renounceConnection
	|givingThisAway|
	givingThisAway := connection.
	connection := nil. "so #caseReuseConnection ==> false ==> new connection next request"
	^givingThisAway

then make use of it here (redefined from above)…


ZnWebSocket class >> onHttpClient: client
	"Attempt to upgrade an existing http client client to a WebSocket.
	Do the initial upgrade handshake and return a functioning ZnWebSocket object.
	Signals a ZnWebSocketFailed error when unsuccessful."

	self upgradeSocketClient: client.
	client execute.
	(self isValidWebSocketResponse: client)
		ifTrue: [
			^ (self onStream: client renounceConnection)
				role: #client;
				yourself ]
		ifFalse: [
			client close.
			(ZnWebSocketFailed response: client response) signal ]

Now we can reuse the http client…


client url: 'http://bittrex.com/signalr/start'.
client queryAt: 'connectionToken' put: connectionToken.
client queryAt: 'connectionData' put: '[{"name": "coreHub"}]'.
client queryAt: 'transport' put: 'webSockets'.
client queryAt: 'clientProtocol' put: '1.5'.
(response := client get) inspect.

==>{ "Response": "started" }

YAY! Lookin’ good.

Ongoing Signalr Hub Messaging

We now move to examine the ongoing websocket communication between the Pharo client and the Bittrex server. Looking first at only the Client–>Server communication using wireshark display filter “(http || websocket) && ip.src == 192.168.43.79” it seems the general philosophy is to first subscribe to exchange deltas so no incremental updates are missed while querying the state of the exchange. The client calls the server coreHub >> SubscribeToExchangeDeltas method with “A” arguments . The client seems responsible for incrementing the invocation identifier “I” with each method call. This is the complete list of methods invoked on the Bittrex server.


WEBSOCKET C-->S (#52)
{"A": ["BTC-ETH"], "H": "coreHub", "M": "SubscribeToExchangeDeltas", "I": 0}

WEBSOCKET C-->S (#53)
{"A": ["BTC-NEO"], "H": "coreHub", "M": "SubscribeToExchangeDeltas", "I": 1}
{"A": ["BTC-ZEC"], "H": "coreHub", "M": "SubscribeToExchangeDeltas", "I": 2}
{"A": ["ETH-NEO"], "H": "coreHub", "M": "SubscribeToExchangeDeltas", "I": 3}
{"A": ["ETH-ZEC"], "H": "coreHub", "M": "SubscribeToExchangeDeltas", "I": 4}

WEBSOCKET C-->S (#124)
{"A": ["BTC-ETH"], "H": "coreHub", "M": "queryExchangeState", "I": 5}

WEBSOCKET C-->S (#227)
{"A": ["BTC-NEO"], "H": "coreHub", "M": "queryExchangeState", "I": 6}

WEBSOCKET C-->S (#305)
{"A": ["BTC-ZEC"], "H": "coreHub", "M": "queryExchangeState", "I": 7}

WEBSOCKET C-->S (#403)
{"A": ["ETH-NEO"], "H": "coreHub", "M": "queryExchangeState", "I": 8}

WEBSOCKET C-->S (#481)
{"A": ["ETH-ZEC"], "H": "coreHub", "M": "queryExchangeState", "I": 9}

Now looking at just a few select Server–>Client packets, these “R”eturn value of the coreHub >> SubscribeToExchangeDeltas calls is minimal, indicating only that each callback was successfully set up. An important observation below is that the responses can arrive out of order compared to the invocation order above.

I’d generally presume that waiting for a return value would be a synchronous, and we see all the invocations of SubscribeToExchangeDeltas were issued before any response arrived, so can infer that the python client is issuing these from multiple threads. Thus we see a requirement for the asynchronous communication channel to be managed by a thread of its own, with a queue receiving synchronous invocations that wait to be signalled from the communication thread.


WEBSOCKET S-->C (#58)
{"R":true,"I":"0"}

WEBSOCKET S-->C (#68)
{"R":true,"I":"2"}

WEBSOCKET S-->C (#70)
{"R":true,"I":"1"}
{"R":true,"I":"3"}

WEBSOCKET S-->C (#72)
{"R":true,"I":"4"}

Below can be seen a few callbacks invoking the CoreHub >> updateExchangeState method on the client with arguments “A”. No data is provided to match the original invocation “I” from above, since the MarketName parameter fully specifies what client data to update. The callback specifies the hub, so implementing it a singular SignalrConnection common to multiple hubs seems appropriate. Waiting synchronous invocations can register themselves as an asynchronous callback to signal the Semaphore they are waiting on.

(btw, I’d be interested if anyone can describe how the message-id “C” comes into play.)

The description here indicates that: Type ’0′ is a simple add operation; Type ’1′ is a delete operation and Type ’2′ is a replace/update operation. The nounce seems to be a monotonically increasing sync point, described here that “you must make sure that your order nounces match or are lower than the order book snapshot nounce for the sync to work.”


WEBSOCKET S-->C (#74)
{"C":"d-3908A267-B,0|yT,0|yU,6|BU,3265|BW,2A33|Bi,2ED8|yV,1|yW,0",
"M":[{"H":"CoreHub","M":"updateExchangeState",
    "A":[{"MarketName":"ETH-NEO","Nounce":9910,
      "Buys":[ {"Type":0,"Rate":0.13574727,"Quantity":24.13300000},
               {"Type":1,"Rate":0.13546489,"Quantity":0.0},
               {"Type":1,"Rate":0.13413976,"Quantity":0.0}],
       "Sells":[{"Type":1,"Rate":0.13797594,"Quantity":0.0}],
       "Fills":[]}  ]}  ]}

{"C":"d-3908A267-B,0|yT,0|yU,6|BU,3266|BW,2A33|Bi,2ED8|yV,1|yW,0",
"M":[{"H":"CoreHub","M":"updateExchangeState",
     "A":[{"MarketName":"BTC-ETH","Nounce":12943,
          "Buys":[{"Type":0,"Rate":0.09900001,"Quantity":8.78200000},
                  {"Type":1,"Rate":0.09896192,"Quantity":0.0},
                  {"Type":0,"Rate":0.09865459,"Quantity":5.49451704},
                  {"Type":0,"Rate":0.09863459,"Quantity":49.24190000},
                  {"Type":2,"Rate":0.09818553,"Quantity":200.52821030},
                  {"Type":1,"Rate":0.09270006,"Quantity":0.0},
                  {"Type":1,"Rate":0.09270000,"Quantity":0.0}],
          "Sells":[{"Type":2,"Rate":0.09990997,"Quantity":30.41524754},
                  {"Type":0,"Rate":0.09990998,"Quantity":1.19474349},
                  {"Type":1,"Rate":0.10033083,"Quantity":0.0},
                  {"Type":0,"Rate":0.10036968,"Quantity":9.99786740},
                  {"Type":1,"Rate":0.10347521,"Quantity":0.0},
                  {"Type":1,"Rate":0.10347608,"Quantity":0.0}],
          "Fills":[]}  ]} ]}

The packet below is tagged as the “R”eturn value to the queryExchangeState call of “I”nvocation=5. You can see from the large number of reassembled segments that it was a large stream of data to specify the full state of orders on the market.


WEBSOCKET S-->C (#295)
[42 Reassembled TCP Segments (57624 bytes): #229, #231, ... #259, #261
{"R":{"MarketName":null,"Nounce":12947,
"Buys":[
    {"Quantity":0.90897209,"Rate":0.09957987},
    {"Quantity":0.67737287,"Rate":0.09926044},
    <snip>
    {"Quantity":19.88806857,"Rate":0.09900000},
    {"Quantity":0.50000000,"Rate":0.09270006}],
"Sells":[
    {"Quantity":0.01126028,"Rate":0.09957992},
    {"Quantity":2.77036447,"Rate":0.09957994},
    <snip>
    {"Quantity":1.16361678,"Rate":0.10347608},
    {"Quantity":0.03352742,"Rate":0.10347627}],
"Fills"[
    {"Id":209766666,"TimeStamp":"2018-02-09T03:53:43.26",
      "Quantity":0.01107151,"Price":0.09957987,"Total":0.00110249,
      "FillType":"PARTIAL_FILL","OrderType":"SELL"},
    {"Id":209766637,"TimeStamp":"2018-02-09T03:53:37.29",
      "Quantity":0.10760787,"Price":0.09957987,"Total":0.01071557,
      "FillType":"PARTIAL_FILL","OrderType":"SELL"},
    <snip>
    {"Id":209765351,"TimeStamp":"2018-02-09T03:49:34.82",
      "Quantity":0.75099182,"Price":0.09937563,"Total":0.07463028,
      "FillType":"FILL","OrderType":"BUY"}
    {"Id":209765349,"TimeStamp":"2018-02-09T03:49:34.76",
      "Quantity":0.06302414,"Price":0.09926046,"Total":0.00625580,
      "FillType":"PARTIAL_FILL","OrderType":"SELL"}
]},"I":"5"}

There are come other Server–>Client packets like the following but at this time I’m not sure how Group “G” plays into things.


WEBSOCKET S-->C (#56)
{"C":"d-3908A267-B,0|yT,0|yU,2|BU,3265",
"G":"JVEbE2r44ZskGVzF+WDXAfADfPGqtlx<...snip...>",
"M":[] }

All the above seems sufficient for me to code up a SignalR Client MVP for Pharo.

Finer points

Reading around some more, I began to suspect that the handshake start might be more associated with hubs than connections, and wondered if the connectionData=[{"name": "coreHub"}] was really required for the negotiate part of the handshake. So to experiment with negotiating without it…

cloudflareun := CloudflareUn knockUrl: 'http://bittrex.com'.

client := cloudflareun client.
client url: 'http://bittrex.com/signalr/negotiate'.
client queryAt: 'clientProtocol' put: '1.5'.
(response := client get).
(params := NeoJSONReader fromString: response) inspect.

==>
{   "Url":"/signalr",
    "ConnectionToken":"rGeOv5pn7MVxYH9<...snip...>",
    "ConnectionId":"af1e5c12-0e6b-482a-a559-0fde8b62fc78",
    "KeepAliveTimeout":20.0,
    "DisconnectTimeout":30.0,
    "ConnectionTimeout":110.0,
    "TryWebSockets":true,
    "ProtocolVersion":"1.5",
    "TransportConnectTimeout":5.0,
    "LongPollDelay":0.0
}

Thats an identical response to when clientData was included. But without clientProtocol is defaults to a lower protocol version…

client := cloudflareun client.
client url: 'http://bittrex.com/signalr/negotiate'.
client queryAt: 'clientProtocol' put: '1.5'.
(response := client get).
(params := NeoJSONReader fromString: response) inspect.

==>
{   "Url":"/signalr",
    "ConnectionToken":"4RcmbWI8yYu6sD6IDX<...snip...>",
    "ConnectionId":"54b3569d-a3a5-4910-a369-142bcd951f99",
    "KeepAliveTimeout":20.0,
    "DisconnectTimeout":30.0,
    "ConnectionTimeout":110.0,
    "TryWebSockets":true,
    "ProtocolVersion":"1.2",
    "TransportConnectTimeout":5.0,
    "LongPollDelay":0.0
}

And then I wondered, clientProtocol seemed redundant in subsequent handshake steps. So I slimmed down the remaining handshake…

client := cloudflareun client.
client url: 'http://bittrex.com/signalr/connect'.
client queryAt: 'connectionToken' put: connectionToken.
client queryAt: 'transport' put: 'webSockets'.
(websocket := ZnWebSocket onHttpClient: client) inspect.
Transcript open; clear.
[websocket runWith: [ :msg | Transcript crShow: msg ]] forkAt: 35.

client := cloudflareun client.
client url: 'http://bittrex.com/signalr/start'.
client queryAt: 'connectionToken' put: connectionToken.
client queryAt: 'connectionData' put: '[{"name": "coreHub"}]'.
client queryAt: 'transport' put: 'webSockets'.
(response := client get) inspect.

==>{ "Response": "started" }

And btw, works equally well if you use “COREHUB”. Microsoft’s usual case insensitivity.

So, does it work? Can we get data…

Leave that websocket running and lets try replicating Invocation #1 from above. We’ll need to JSON encode the request, slide it down the pipe and see what we get…


message := Dictionary new.
message at: 'I' put: 0.
message at: 'H' put: 'coreHub'.
message at: 'M' put: 'SubscribeToExchangeDeltas'.
message at: 'A' put: {'BTC-ETH'}.
json := String streamContents: [ :stream |
	(NeoJSONWriter on: stream) nextPut: message ].
websocket sendText: json.

==>(in Transcript)
{"C":"d-64571BDB-LJ,1","S":1,"M":[]}
{}
{}
{}
{"C":"d-64571BDB-LJ,2|JE,D6D","G":"oS5hHv23WBWXIR","M":[]}
{"R":true,"I":"0"}
{"C":"d-64571BDB-LJ,2|JE,D6E","M":[{"H":"CoreHub","M":"updateExchangeState",
   "A":[{"MarketName":"BTC-ETH","Nounce":4326,
     "Buys":[
        {"Type":2,"Rate":0.10036454,"Quantity":23.77749657},
        {"Type":0,"Rate":0.09912933,"Quantity":33.90360000},
        {"Type":1,"Rate":0.09905617,"Quantity":0.0}],
     "Sells":[
        {"Type":0,"Rate":0.10069995,"Quantity":44.06500000},
        {"Type":1,"Rate":0.10092096,"Quantity":0.0}],
     "Fills":[
        {"OrderType":"SELL","Rate":0.10036454,"Quantity":1.30127600,"TimeStamp":"2018-02-10T17:47:42.933"}]
}]}]}
{"C":"d-64571BDB-LJ,2|JE,D6F","M":[{"H":"CoreHub","M":"updateExchangeState",
   "A":[{"MarketName":"BTC-ETH","Nounce":4327,
     "Buys":[
        {"Type":1,"Rate":0.09912933,"Quantity":0.0},
        {"Type":0,"Rate":0.09910001,"Quantity":33.04100000}],
     "Sells":[],
     "Fills":[]
 }]}]}
{"C":"d-64571BDB-LJ,2|JE,D70","M":[{"H":"CoreHub","M":"updateExchangeState",
   "A":[{"MarketName":"BTC-ETH","Nounce":4328,
     "Buys":[
        {"Type":2,"Rate":0.10036454,"Quantity":23.25325894},
        {"Type":1,"Rate":0.09956801,"Quantity":0.0},
        {"Type":0,"Rate":0.09920001,"Quantity":33.96970000},
        {"Type":1,"Rate":0.09910001,"Quantity":0.0},
        {"Type":0,"Rate":0.09360000,"Quantity":0.26403692}],
    "Sells":[
        {"Type":0,"Rate":0.10069994,"Quantity":0.99299630},
        {"Type":0,"Rate":0.10083024,"Quantity":14.27868176},
        {"Type":1,"Rate":0.10092095,"Quantity":0.0},
        {"Type":1,"Rate":0.10305599,"Quantity":0.0}],
     "Fills":[
        {"OrderType":"SELL","Rate":0.10036454,"Quantity":0.52423763,"TimeStamp":"2018-02-10T17:47:44.167"}]
}]}]}

WOOHOO!! Ready to roll….

Conclusion

Okay! Now I’m off for a bit to build what I learned into a library. I’ll report back here to link to it shortly.

This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply