diff --git a/assistant.go b/assistant.go index 1849115..3e115b0 100644 --- a/assistant.go +++ b/assistant.go @@ -12,7 +12,7 @@ import ( tele "gopkg.in/telebot.v3" ) -var assistantWritingSign = "\n... 📝" +var assistantWritingSign = "\n...📝" func matchAssistantConversation(botUsr *tele.User, msg *tele.Message) []*tele.Message { // A thread must meet the following conditions to be considered a conversation with the assistant (for now): @@ -87,7 +87,7 @@ func matchAssistantConversation(botUsr *tele.User, msg *tele.Message) []*tele.Me return nil } -type assistantStreamedResponseCb func(text string, finished bool) error +type assistantStreamedResponseCb func(text string, finished bool) (*tele.Message, error) func assistantStreamedResponse(request openai.ChatRequest, cb assistantStreamedResponseCb) error { logger.Debugw("Openai chat request", "req", request) @@ -111,7 +111,7 @@ func assistantStreamedResponse(request openai.ChatRequest, cb assistantStreamedR nErrs := 0 go func() { respBuilder := strings.Builder{} - minWait := time.After(time.Second) + minWait := time.After(1 * time.Second) for { var ( nNewChunk int @@ -149,9 +149,9 @@ func assistantStreamedResponse(request openai.ChatRequest, cb assistantStreamedR } respoText := respBuilder.String() + assistantWritingSign - minWait = time.After(time.Second) // renew the timer + minWait = time.After(691 * time.Millisecond) // renew the timer - if err := cb(respoText, false); err != nil { + if _, err := cb(respoText, false); err != nil { logger.Warnw("failed to send partial update", "error", err) nErrs += 1 if nErrs > 3 { @@ -165,7 +165,7 @@ func assistantStreamedResponse(request openai.ChatRequest, cb assistantStreamedR } respText := respBuilder.String() - if err = cb(respText, true); err != nil { + if _, err = cb(respText, true); err != nil { logger.Warnw("assistant: failed to send message", "error", err) } }() @@ -213,7 +213,6 @@ func handleAssistantConversation(c tele.Context, thread []*tele.Message) error { }) } if len(convMsgs) == 0 { - // It turns out that this will never happen because Telegram splits messages when they exceed a certain length. return c.Reply("Your message is too long (Sorry!)") } for l := len(convMsgs) - 1; l >= 0; l-- { @@ -227,10 +226,14 @@ func handleAssistantConversation(c tele.Context, thread []*tele.Message) error { MaxTokens: 2048, } - typingNotifyCh := setTyping(c) + typingNotifyCh := make(chan struct{}) + go func() { + defer close(typingNotifyCh) + _ = c.Bot().Notify(lastMsg.Chat, tele.Typing) + }() var replyMsg *tele.Message - reqErr := assistantStreamedResponse(req, func(text string, finished bool) error { + reqErr := assistantStreamedResponse(req, func(text string, finished bool) (*tele.Message, error) { var err error if replyMsg == nil { <-typingNotifyCh @@ -244,7 +247,7 @@ func handleAssistantConversation(c tele.Context, thread []*tele.Message) error { logger.Warnw("failed to cache message", "error", err) } } - return err + return replyMsg, err }) if reqErr != nil { diff --git a/bot.go b/bot.go index d805bc9..fa7413f 100644 --- a/bot.go +++ b/bot.go @@ -117,12 +117,3 @@ func stickerFromID(id string) *tele.Sticker { }, } } - -func setTyping(c tele.Context) chan error { - resultCh := make(chan error, 1) - go func() { - defer close(resultCh) - resultCh <- c.Bot().Notify(c.Chat(), tele.Typing) - }() - return resultCh -} diff --git a/botcmd_translate.go b/botcmd_translate.go index 0ba1fbc..d46b5e7 100644 --- a/botcmd_translate.go +++ b/botcmd_translate.go @@ -3,6 +3,7 @@ package main import ( "regexp" "strings" + "time" "github.com/samber/lo" tele "gopkg.in/telebot.v3" @@ -71,6 +72,13 @@ func handleTranslateBtn(c tele.Context) error { return nil } + // pretend to be typing + if err := c.Bot().Notify(msg.Chat, tele.Typing); err != nil { + logger.Warnf("failed to send typing action: %v", err) + } + + ai := openai.NewClient(config.OpenAIApiKey) + req := openai.ChatRequest{ Model: openai.ModelGpt0305Turbo, Messages: []openai.ChatMessage{ @@ -85,27 +93,67 @@ func handleTranslateBtn(c tele.Context) error { }, Temperature: lo.ToPtr(0.2), } + logger.Debugf("Openai chat request: %#+v", req) - actionCh := setTyping(c) - err := assistantStreamedResponse(req, func(text string, finished bool) error { - var err error - <-actionCh - if finished { - retryBtn := translateBtnRetry - retryBtn.Data = targetLang - respMenu := &tele.ReplyMarkup{} - respMenu.Inline(respMenu.Row(retryBtn)) - - msg, err = c.Bot().Edit(msg, text, respMenu) - } else { - msg, err = c.Bot().Edit(msg, text) - } - return err - }) + resp, err := ai.ChatCompletionStream(req) if err != nil { logger.Errorf("failed to translate: req: %#+v, err: %v", req, err) _, _ = c.Bot().Reply(origMsg, stickerFromID(stickerPanic), tele.Silent) return err } - return nil + + respBuilder := strings.Builder{} + minWait := time.After(1 * time.Second) + for { + var ( + nNewChunk int + finished bool + minWaitSatisfied bool + ) + Drain: + for { + select { + case chunk, ok := <-resp.Stream: + if !ok { + finished = true + break Drain + } + nNewChunk += 1 + respBuilder.WriteString(chunk) + default: + if minWaitSatisfied { + break Drain + } + <-minWait + minWaitSatisfied = true + } + } + + if nNewChunk == 0 { + if chunk, ok := <-resp.Stream; !ok { + finished = true + } else { + respBuilder.WriteString(chunk) + } + } + if finished { + break + } + + respoText := respBuilder.String() + assistantWritingSign + minWait = time.After(691 * time.Millisecond) // renew the timer + if msg, err = c.Bot().Edit(msg, respoText, tele.Silent); err != nil { + logger.Warnf("failed to edit the temporary message: %v", err) + break + } + logger.Debugf("... message edited") + } + + respText := respBuilder.String() + retryBtn := translateBtnRetry + retryBtn.Data = targetLang + respMenu := &tele.ReplyMarkup{} + respMenu.Inline(respMenu.Row(retryBtn)) + _, err = c.Bot().Edit(msg, respText, tele.Silent, respMenu) + return err }