File size: 2,306 Bytes
360d784
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time    : 2023/5/11 14:42
@Author  : alexanderwu
@File    : manager.py
"""
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.schema import Message


class Manager:
    def __init__(self, llm: LLM = LLM()):
        self.llm = llm  # Large Language Model
        self.role_directions = {
            "BOSS": "Product Manager",
            "Product Manager": "Architect",
            "Architect": "Engineer",
            "Engineer": "QA Engineer",
            "QA Engineer": "Product Manager"
        }
        self.prompt_template = """
        Given the following message:
        {message}

        And the current status of roles:
        {roles}

        Which role should handle this message?
        """

    async def handle(self, message: Message, environment):
        """
        管理员处理信息,现在简单的将信息递交给下一个人
        The administrator processes the information, now simply passes the information on to the next person
        :param message:
        :param environment:
        :return:
        """
        # Get all roles from the environment
        roles = environment.get_roles()
        # logger.debug(f"{roles=}, {message=}")

        # Build a context for the LLM to understand the situation
        # context = {
        #     "message": str(message),
        #     "roles": {role.name: role.get_info() for role in roles},
        # }
        # Ask the LLM to decide which role should handle the message
        # chosen_role_name = self.llm.ask(self.prompt_template.format(context))

        # FIXME: 现在通过简单的字典决定流向,但之后还是应该有思考过程
        #The direction of flow is now determined by a simple dictionary, but there should still be a thought process afterwards
        next_role_profile = self.role_directions[message.role]
        # logger.debug(f"{next_role_profile}")
        for _, role in roles.items():
            if next_role_profile == role.profile:
                next_role = role
                break
        else:
            logger.error(f"No available role can handle message: {message}.")
            return

        # Find the chosen role and handle the message
        return await next_role.handle(message)