89 lines
2.4 KiB
Go
89 lines
2.4 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
func (o *DomainWorker) stream() {
|
|
u := url.URL{Scheme: "wss", Host: o.domain, Path: "/api/v1/streaming", RawQuery: "stream=public"}
|
|
for streamFailures := 0; streamFailures < 3; streamFailures++ {
|
|
o.streamProcess(u.String())
|
|
log.Warn().Str("Domain", o.domain).Msg("Stream: failed, sleeping and retrying")
|
|
time.Sleep(30 * time.Second)
|
|
}
|
|
}
|
|
|
|
func (o *DomainWorker) unmarshalStreamPayload(data map[string]interface{}) (map[string]interface{}, bool) {
|
|
payloadStr, ok := data["payload"].(string)
|
|
if !ok {
|
|
log.Error().Str("Domain", o.domain).Msg(`Stream: missing data["payload"] string`)
|
|
return nil, false
|
|
}
|
|
var payload map[string]interface{}
|
|
err := json.Unmarshal([]byte(payloadStr), &payload)
|
|
if err != nil {
|
|
log.Error().Str("Domain", o.domain).Err(err).Msg("Stream: Failed to unmarshal payload")
|
|
return nil, false
|
|
}
|
|
return payload, true
|
|
}
|
|
|
|
func (o *DomainWorker) streamProcess(url string) {
|
|
log.Info().Str("Domain", o.domain).Str("URL", url).Msg("Stream: Connecting")
|
|
c, _, err := websocket.DefaultDialer.Dial(url, nil)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Stream: dial")
|
|
time.Sleep(30 * time.Second)
|
|
return
|
|
}
|
|
defer c.Close()
|
|
|
|
for {
|
|
_, message, err := c.ReadMessage()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Stream: read")
|
|
return
|
|
}
|
|
var data map[string]interface{}
|
|
err = json.Unmarshal(message, &data)
|
|
event, ok := data["event"].(string)
|
|
if !ok {
|
|
log.Error().Str("Domain", o.domain).Bytes("data", message).Msg(`Stream: missing data["event"]`)
|
|
continue
|
|
}
|
|
|
|
switch event {
|
|
case "update":
|
|
o.processStatus(data)
|
|
case "notification:":
|
|
payload, ok := o.unmarshalStreamPayload(data)
|
|
if !ok {
|
|
continue
|
|
}
|
|
log.Info().Str("Domain", o.domain).Interface("notification", payload).Msg("Stream: notification")
|
|
default:
|
|
log.Info().Msgf("Stream recv %s: %v\n", event, data)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *DomainWorker) processStatus(data map[string]interface{}) {
|
|
payload, ok := o.unmarshalStreamPayload(data)
|
|
if !ok {
|
|
return
|
|
}
|
|
account, ok := payload["account"].(map[string]interface{})
|
|
if !ok {
|
|
log.Error().Str("Domain", o.domain).Msg("Stream: Type Assert account")
|
|
return
|
|
}
|
|
o.hub.CreateUser <- CreateUserReq{account, o.domain}
|
|
o.processMentions(payload)
|
|
log.Info().Str("Domain", o.domain).Msg("Stream: update")
|
|
}
|