Files

89 lines
2.4 KiB
Go

package main
import (
"encoding/json"
"net/url"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
func (o *DomainWorker) stream() {
u := url.URL{Scheme: "wss", Host: o.domain, Path: "/api/v1/streaming", RawQuery: "stream=public"}
for streamFailures := 0; streamFailures < 3; streamFailures++ {
o.streamProcess(u.String())
log.Warn().Str("Domain", o.domain).Msg("Stream: failed, sleeping and retrying")
time.Sleep(30 * time.Second)
}
}
func (o *DomainWorker) unmarshalStreamPayload(data map[string]interface{}) (map[string]interface{}, bool) {
payloadStr, ok := data["payload"].(string)
if !ok {
log.Error().Str("Domain", o.domain).Msg(`Stream: missing data["payload"] string`)
return nil, false
}
var payload map[string]interface{}
err := json.Unmarshal([]byte(payloadStr), &payload)
if err != nil {
log.Error().Str("Domain", o.domain).Err(err).Msg("Stream: Failed to unmarshal payload")
return nil, false
}
return payload, true
}
func (o *DomainWorker) streamProcess(url string) {
log.Info().Str("Domain", o.domain).Str("URL", url).Msg("Stream: Connecting")
c, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Error().Err(err).Msg("Stream: dial")
time.Sleep(30 * time.Second)
return
}
defer c.Close()
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Error().Err(err).Msg("Stream: read")
return
}
var data map[string]interface{}
err = json.Unmarshal(message, &data)
event, ok := data["event"].(string)
if !ok {
log.Error().Str("Domain", o.domain).Bytes("data", message).Msg(`Stream: missing data["event"]`)
continue
}
switch event {
case "update":
o.processStatus(data)
case "notification:":
payload, ok := o.unmarshalStreamPayload(data)
if !ok {
continue
}
log.Info().Str("Domain", o.domain).Interface("notification", payload).Msg("Stream: notification")
default:
log.Info().Msgf("Stream recv %s: %v\n", event, data)
}
}
}
func (o *DomainWorker) processStatus(data map[string]interface{}) {
payload, ok := o.unmarshalStreamPayload(data)
if !ok {
return
}
account, ok := payload["account"].(map[string]interface{})
if !ok {
log.Error().Str("Domain", o.domain).Msg("Stream: Type Assert account")
return
}
o.hub.CreateUser <- CreateUserReq{account, o.domain}
o.processMentions(payload)
log.Info().Str("Domain", o.domain).Msg("Stream: update")
}