多智能体粒子环境(Multi-Agent Particle Env)食用指南--从入门到入土
<h2 id="0项目地址">0.项目地址:</h2><blockquote>
<p>原地址:openai/multiagent-particle-envs: Code for a multi-agent particle environment used in the paper "Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments" (github.com)</p>
<p>国内镜像:项目首页 - multiagent-particle-envs:Code for a multi-agent particle environment used in the paper "Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments" - GitCode</p>
</blockquote>
<h3 id="环境列表">环境列表</h3>
<table>
<thead>
<tr>
<th style="text-align: center">代码中的环境名称</th>
<th>沟通</th>
<th style="text-align: center">竞争</th>
<th style="text-align: center">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">simple</td>
<td>N</td>
<td style="text-align: center">N</td>
<td style="text-align: center">单个智能体看到地标位置,根据它与地标的接近程度进行奖励。不是多智能体环境——用于调试策略。</td>
</tr>
<tr>
<td style="text-align: center">simple_adversary.py</td>
<td>(物理欺骗)</td>
<td style="text-align: center">N</td>
<td style="text-align: center">是 1 个对手(红色),N 个优秀智能体(绿色),N 个地标(通常 N=2)。所有智能体都会观察地标和其他智能体的位置。一个地标是“目标地标”(绿色)。好的智能体根据其中一个与目标地标的接近程度进行奖励,但如果对手靠近目标地标,则获得负面奖励。对手根据它与目标的接近程度获得奖励,但它不知道哪个地标是目标地标。因此,优秀的智能体必须学会“拆分”并覆盖所有地标以欺骗对手。</td>
</tr>
<tr>
<td style="text-align: center">simple_crypto.py (秘密交流)</td>
<td>是</td>
<td style="text-align: center">是</td>
<td style="text-align: center">两个好智能体人(爱丽丝和鲍勃),一个对手(夏娃)。Alice 必须通过公共频道向 bob 发送私人消息。Alice 和 bob 会根据 bob 重建消息的程度获得奖励,但如果 eve 能够重建消息,则获得负面奖励。Alice 和 bob 有一个私钥(在每集开始时随机生成),他们必须学会使用它来加密消息。</td>
</tr>
<tr>
<td style="text-align: center">simple_push.py (远离)</td>
<td>N</td>
<td style="text-align: center">是</td>
<td style="text-align: center">1 个智能体、1 个对手、1 个地标。智能体根据与地标的距离进行奖励。如果对手靠近地标,并且智能体远离地标,则它会得到奖励。因此,对手学会将智能体推离地标。</td>
</tr>
<tr>
<td style="text-align: center">simple_reference.py</td>
<td>是</td>
<td style="text-align: center">N</td>
<td style="text-align: center">2 个智能体,3 个不同颜色的地标。每个智能体都想到达他们的目标地标,只有其他智能体知道。奖励是集体的。因此,智能体必须学会传达另一个智能体的目标,并导航到他们的地标。这与 simple_speaker_listener 场景相同,其中两个智能体同时是说话者和听众。</td>
</tr>
<tr>
<td style="text-align: center">simple_speaker_listener.py (合作交流)</td>
<td>是</td>
<td style="text-align: center">N</td>
<td style="text-align: center">与 simple_reference 相同,除了一个智能体是不动的“说话者”(灰色)(观察其他智能体的目标),另一个智能体是听者(不能说话,但必须导航到正确的地标)。</td>
</tr>
<tr>
<td style="text-align: center">simple_spread.py (合作导航)</td>
<td>N</td>
<td style="text-align: center">N</td>
<td style="text-align: center">N 个智能体,N 个地标。根据任何智能体与每个地标的距离对智能体进行奖励。如果智能体与其他智能体发生冲突,则会受到惩罚。因此,智能体必须学会在避免碰撞的同时覆盖所有地标。</td>
</tr>
<tr>
<td style="text-align: center"><mark>simple_tag.py (捕食者-猎物)</mark></td>
<td>N</td>
<td style="text-align: center">是</td>
<td style="text-align: center">捕食者-猎物环境。好的智能体(绿色)速度更快,并且希望避免被对手(红色)击中。对手速度较慢,并希望打击优秀的智能体。障碍物(大黑圈)挡住了去路。</td>
</tr>
<tr>
<td style="text-align: center">simple_world_comm.py</td>
<td>是</td>
<td style="text-align: center">是</td>
<td style="text-align: center">在论文随附的视频中看到的环境。与 simple_tag 相同,除了 (1) 有食物(蓝色小球),好智能体会因为靠近而获得奖励,(2)我们现在有“森林”,可以将智能体隐藏在里面,从外面看不到;(3)有一个“领导对手”,可以随时看到智能体人,并可以与其他对手沟通,帮助协调追击。</td>
</tr>
</tbody>
</table>
<hr>
<blockquote>
<p>如果从头开始就按下面步骤来,不要跳。</p>
</blockquote>
<h2 id="1创建虚拟环境">1.创建虚拟环境</h2>
<p>指令格式:<code>conda create -n env_name python=x.x</code></p>
<p>输入:</p>
<pre><code class="language-bash">conda create -n mpe python=3.6
</code></pre>
<p>安装环境默认路径在 Anaconda 目录下的 envs 里面,如图:一路点Yes</p>
<img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151318620.png" alt="202503151249754">
<h2 id="2激活虚拟环境">2.激活虚拟环境</h2>
<pre><code class="language-bash">conda activate mpe
</code></pre>
<p><strong>从base环境进入mpe项目环境,后面的依赖包会下载到项目环境里,避免与其他项目冲突出现版本问题。</strong></p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151318418.png" alt="image-20250304140317206" loading="lazy"></p>
<h2 id="3下载相关依赖包">3.下载相关依赖包</h2>
<p><strong>注意版本问题,新版本gym会报错“prng模块的缺失”,pyglet版本太高也会报错。</strong></p>
<p>依赖包版本如下(能正常跑的):</p>
<blockquote>
<p>Python =3.6 gym=0.10.5 tensorflow = 1.14.0 numpy =1.19.5 pyglet = 1.5.9</p>
</blockquote>
<p>输入:</p>
<pre><code class="language-bash">pip install gym==0.10.5 tensorflow==1.14.0 pyglet==1.5.9
</code></pre>
<h2 id="4安装openai的multiagent-particle-envs">4.安装openAI的<strong>Multiagent-particle-envs</strong></h2>
<p>进入“<code>multiagent-particle-envs</code>”目录,安装环境(最后的点.不要漏了):</p>
<pre><code class="language-bash">pip install -e.
</code></pre>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151318375.png" alt="image-20250304141243909" loading="lazy"></p>
<h2 id="5测试环境">5.测试环境:</h2>
<pre><code class="language-bash">python bin/interactive.py --scenario simple.py
</code></pre>
<p>成功的话得到如下画面:</p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151318621.png" alt="image-20250304142227317" loading="lazy"></p>
<p>说明包版本啥的没有问题,可以继续。</p>
<h2 id="6-安装openai的maddpg算法">6. 安装openAI的maddpg算法</h2>
<ul>
<li>
<p>下载克隆maddpg开源项目文件openai/maddpggithub.com到mpe的同一目录下,目录结构如图:</p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151318925.png" alt="image-20250304145622996" loading="lazy"></p>
</li>
<li>
<p>进入<code>maddgp</code>目录:</p>
</li>
</ul>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319275.png" alt="image-20250304145715412" loading="lazy"></p>
<ul>
<li>安装</li>
</ul>
<pre><code class="language-python">pip install -e.
</code></pre>
<ul>
<li>
<p>测试</p>
<p>安装完成后,输入如下代码进行测试</p>
<pre><code class="language-bash">cd experiments
python train.py --scenario simple
</code></pre>
<p>以下画面说明成功:按<code>ctrl+c</code>命令行终止</p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319739.png" alt="image-20250304150050917" loading="lazy"></p>
<ul>
<li>开可视化</li>
</ul>
<p>找到<code>maddpg</code>-><code>experiments</code>-><code>train.py</code>中,找到<code>display</code>可视化属性,改为<code>True</code>即可看见训练过程</p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319444.png" alt="image-20250304145833270" loading="lazy"></p>
<p>如下图说明成功:</p>
</li>
</ul>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319820.png" alt="image-20250304145507248" loading="lazy"></p>
<h2 id="7切换其他环境">7.切换其他环境</h2>
<p>在<code>maddpg/experiments</code>文件夹下运行如下代码:</p>
<pre><code class="language-bash">cd experiments
python train.py --scenario simple_tag
</code></pre>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319609.png" alt="image-20250304150951295" loading="lazy"></p>
<p>如果要可视化,后面加上<code>display</code>属性:</p>
<pre><code class="language-bash">python train.py --scenario simple_tag --display
</code></pre>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319627.png" alt="image-20250304151112007" loading="lazy"></p>
<p>训练完:默认episodes: 60000</p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319416.png" alt="image-20250304192504899" loading="lazy"></p>
<p>注意:第一次训练时display一定得设置为false,第二次运行train.py时才能导入第一次跑完存储的模型进行可视化。</p>
<p><code>--display</code>: 展示训练结果, <strong>但不继续训练</strong> (默认: <code>False</code>)</p>
<p>不然会报错:</p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319163.png" alt="image-20250304222434354" loading="lazy"></p>
<blockquote>
<p>以上就完成了环境配置。其他的遇到文件夹路径、包导入之类的基本问题,可以自行查阅解决。</p>
</blockquote>
<hr>
<h2 id="8命令参数说明">8.命令参数说明</h2>
<blockquote>
<p>主要剖析simple_tag环境</p>
</blockquote>
<h3 id="环境选项">环境选项</h3>
<ul>
<li><code>--scenario</code>:选择多智能体环境场景脚本名称(如simple_tag.py)(默认: <code>"simple"</code>)</li>
<li><code>--max-episode-len</code> :单个episode的最大步长,超过此步长会强制终止 (默认: <code>25</code>)</li>
<li><code>--num-episodes</code> :总训练episode数量 (默认: <code>60000</code>)</li>
<li><code>--num-adversaries</code>: 环境中的adversary数量 (默认: <code>0</code>)(需与场景脚本中定义的一致)</li>
<li><code>--good-policy</code>: 环境中good policy算法(默认: <code>"maddpg"</code>; 选项: {<code>"maddpg"</code>, <code>"ddpg"</code>})</li>
<li><code>--adv-policy</code>: 环境中adversary policy算法(默认: <code>"maddpg"</code>; 选项: {<code>"maddpg"</code>, <code>"ddpg"</code>})</li>
</ul>
<blockquote>
<h4 id="关键点"><mark>关键点</mark>:</h4>
<img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319062.png" alt="image-20250306150126724" style="zoom: 150%">
<ul>
<li><code>--num-adversaries</code> 必须与场景脚本(如 <code>simple_tag.py</code>)中定义的对抗者数量一致,否则策略分配会出错,导致曲线收敛可能达不到预期效果。</li>
<li><code>--good-policy</code> 和 <code>--adv-policy</code> 指定不同类别智能体的算法,默认为 <code>maddpg</code>。</li>
</ul>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151319178.png" alt="image-20250306195458885" loading="lazy"></p>
</blockquote>
<h3 id="核心训练参数">核心训练参数</h3>
<ul>
<li><code>--lr</code>: Adam优化器的学习率 (默认: <code>1e-2</code>),如果学习率过高,可能导致策略更新不稳定;过低则学习缓慢。</li>
<li><code>--gamma</code>: 奖励折扣因子(discount factor) (默认: <code>0.95</code>),这可能影响长期奖励的累积。如果任务需要更长期的规划,可能需要更高的gamma。</li>
<li><code>--batch-size</code>: 从经验回放池中采样的批量大小 (默认: <code>1024</code>),较大的批次可能影响更新的稳定性,尤其是在初期训练阶段。</li>
<li><code>--num-units</code>: 神经网络隐藏层的单元数 (默认: <code>64</code>)</li>
</ul>
<h3 id="保存">保存</h3>
<ul>
<li><code>--exp-name</code>: 实验名称,用于保存结果的文件名前缀 (默认: <code>None</code>)</li>
<li><code>--save-dir</code>: 模型保存目录 (默认: <code>"/tmp/policy/"</code>)</li>
<li><code>--save-rate</code>:每完成多少个episode保存一次模型 (默认: <code>1000</code>)</li>
<li><code>--load-dir</code>: 预训练模型加载目录 (默认: <code>""</code>)</li>
</ul>
<h3 id="评估">评估</h3>
<ul>
<li><code>--restore</code>: 恢复在<code>load-dir</code>的训练结果, 并且继续训练 (默认: <code>False</code>)</li>
<li><code>--display</code>: 是否实时渲染环境(训练时可视化), 但不继续训练 (默认: <code>False</code>)</li>
<li><code>--benchmark</code>: 是否进入评估模式(不训练,仅测试策略性能,保存结果到 <code>benchmark-dir</code> 文件夹 (默认: <code>False</code>)</li>
<li><code>--benchmark-iters</code>: 执行基准评估的训练周期 (默认: <code>100000</code>)</li>
<li><code>--benchmark-dir</code>: 存放基准数据的目录 (默认: <code>"./benchmark_files/"</code>)</li>
<li><code>--plots-dir</code>: 存放训练曲线的目录 (默认: <code>"./learning_curves/"</code>)</li>
</ul>
<h4 id="1-none_rewardspkl"><strong>(1) <code>None_rewards.pkl</code></strong></h4>
<ul>
<li><strong>数据类型</strong>:列表(<code>final_ep_rewards</code>)。</li>
<li><strong>具体内容</strong>:
<ul>
<li>每个元素表示训练过程中 <strong>每间隔 <code>save_rate</code> 个 episodes 的平均总奖励</strong>。</li>
<li>例如,若 <code>save_rate=100</code>,则列表中第 <code>i</code> 个元素对应第 <code>i*100</code> 个 episodes 的平均总奖励。</li>
</ul>
</li>
<li><strong>用途</strong>:用于绘制 <strong>全局学习曲线</strong>,反映整体策略的收敛性和性能变化。</li>
</ul>
<h4 id="2-none_agrewardspkl"><strong>(2) <code>None_agrewards.pkl</code></strong></h4>
<ul>
<li>
<p><strong>数据类型</strong>:列表(<code>final_ep_ag_rewards</code>)。</p>
</li>
<li>
<p><strong>具体内容</strong>:</p>
<ul>
<li>
<p>每个元素表示训练过程中 <strong>每个智能体在间隔 <code>save_rate</code> 个 episodes 内的平均奖励</strong>。</p>
</li>
<li>
<p>例如,若有 3 个智能体,<code>save_rate=100</code>,则列表中元素顺序为:</p>
<pre><code>[智能体1的第100轮平均奖励, 智能体2的第100轮平均奖励, 智能体3的第100轮平均奖励, 智能体1的第200轮平均奖励, ...]
</code></pre>
</li>
</ul>
</li>
<li>
<p><strong>用途</strong>:用于分析 <strong>各智能体的独立学习曲线</strong>,观察协作或竞争行为对个体奖励的影响。</p>
</li>
</ul>
<h2 id="demo">Demo</h2>
<ul>
<li>
<p>进行训练<br>
<code>python train.py --scenario simple_push --num-episodes 1000000 --exp-name exp1 --save-dir dir</code></p>
</li>
<li>
<p>训练结果可视化<br>
<code>python train.py --scenario simple_push --load-dir dir --display</code></p>
</li>
<li>
<p>继续训练<br>
<code>python train.py --scenario simple_push --load-dir dir --restore --num-episodes 80000</code></p>
</li>
</ul>
<h2 id="9代码注释">9.代码注释</h2>
<h3 id="训练文件trainpy">训练文件<code>train.py</code></h3>
<h4 id="1-神经网络模型定义-mlp_model">1. 神经网络模型定义 (<code>mlp_model</code>)</h4>
<pre><code class="language-python">def mlp_model(input, num_outputs, scope, reuse=False, num_units=64, rnn_cell=None):
"""
定义一个2层全连接神经网络
:param input: 输入张量(观测状态)
:param num_outputs: 输出层维度(对应动作空间)
:param scope: 变量作用域名称(用于区分不同Agent的网络)
:param reuse: 是否重用变量(用于共享参数)
:param num_units: 隐藏层单元数(通过--num-units参数指定)
"""
with tf.variable_scope(scope, reuse=reuse):
out = layers.fully_connected(input, num_units, tf.nn.relu)# 第一层:64单元,ReLU激活
out = layers.fully_connected(out, num_units, tf.nn.relu) # 第二层:64单元,ReLU激活
out = layers.fully_connected(out, num_outputs, None) # 输出层:线性激活
return out
</code></pre>
<p><strong>结构示意图</strong>:</p>
<pre><code class="language-python">输入层(obs_dim) -> 隐藏层(64) -> 隐藏层(64) -> 输出层(action_dim)
</code></pre>
<h4 id="2-环境创建-make_env"><strong>2. 环境创建 (<code>make_env</code>)</strong></h4>
<pre><code class="language-python">def make_env(scenario_name, arglist, benchmark=False):
"""
根据场景名称创建多智能体环境
:param scenario_name: 场景脚本名称(如simple_tag)
:param arglist: 命令行参数
:param benchmark: 是否为评估模式(影响环境是否返回基准数据)
"""
from multiagent.environment import MultiAgentEnv
import multiagent.scenarios as scenarios
# 动态加载场景脚本(如simple_tag.py)
scenario = scenarios.load(scenario_name + ".py").Scenario()
world = scenario.make_world()# 调用场景的make_world方法创建世界
# 根据模式创建环境
if benchmark:
env = MultiAgentEnv(world, scenario.reset_world, scenario.reward,
scenario.observation, scenario.benchmark_data)
else:
env = MultiAgentEnv(world, scenario.reset_world, scenario.reward,
scenario.observation)
return env
</code></pre>
<hr>
<h4 id="3-智能体训练器初始化-get_trainers"><strong>3. 智能体训练器初始化 (<code>get_trainers</code>)</strong></h4>
<pre><code class="language-python">def get_trainers(env, num_adversaries, obs_shape_n, arglist):
"""
为每个智能体创建对应的训练器(MADDPGAgentTrainer)
:param env: 环境对象
:param num_adversaries: 对抗者数量(通过--num-adversaries指定)
:param obs_shape_n: 所有智能体的观测空间形状列表
:param arglist: 命令行参数
"""
trainers = []
model = mlp_model# 使用的神经网络模型
# 为对抗者创建训练器(使用adv-policy参数)
for i in range(num_adversaries):
trainers.append(MADDPGAgentTrainer(
name="agent_%d" % i,
model=model,
obs_shape=obs_shape_n,
act_space=env.action_space,
agent_index=i,
arglist=arglist,
local_q_func=(arglist.adv_policy == 'ddpg')# 若为DDPG则使用局部Q函数
))
# 为合作者创建训练器(使用good-policy参数)
for i in range(num_adversaries, env.n):
trainers.append(MADDPGAgentTrainer(
name="agent_%d" % i,
model=model,
obs_shape=obs_shape_n,
act_space=env.action_space,
agent_index=i,
arglist=arglist,
local_q_func=(arglist.good_policy == 'ddpg')
))
return trainers
</code></pre>
<p><strong>关键逻辑</strong>:</p>
<ul>
<li>前 <code>num_adversaries</code> 个Agent被标记为对抗者,使用 <code>adv-policy</code> 参数指定的算法。</li>
<li>剩余Agent为合作者,使用 <code>good-policy</code> 参数。</li>
</ul>
<hr>
<h4 id="4-主训练循环-train"><strong>4. 主训练循环 (<code>train</code>)</strong></h4>
<pre><code class="language-python">def train(arglist):
with U.single_threaded_session():# 创建TensorFlow单线程会话
# 环境初始化
env = make_env(arglist.scenario, arglist)
obs_shape_n = .shape for i in range(env.n)]
# 训练器初始化(区分对抗者和合作者)
num_adversaries = min(env.n, arglist.num_adversaries)
trainers = get_trainers(env, num_adversaries, obs_shape_n, arglist)
# TensorFlow变量初始化
U.initialize()
# 经验回放相关变量
episode_rewards = # 累计奖励
agent_rewards = [ for _ in range(env.n)]# 每个Agent的独立奖励
# 主循环
obs_n = env.reset()
episode_step = 0
train_step = 0
while True:
# 1. 获取动作
action_n =
# 2. 环境交互
new_obs_n, rew_n, done_n, info_n = env.step(action_n)
episode_step += 1
# 3. 存储经验
for i, agent in enumerate(trainers):
agent.experience(obs_n, action_n, rew_n,
new_obs_n, done_n, terminal)
# 4. 更新观察状态
obs_n = new_obs_n
# 5. 累计奖励记录
for i, rew in enumerate(rew_n):
episode_rewards[-1] += rew
agent_rewards[-1] += rew
# 6. Episode终止判断
if done or (episode_step >= arglist.max_episode_len):
obs_n = env.reset()
episode_step = 0
episode_rewards.append(0)
for a in agent_rewards:
a.append(0)
# 7. 网络更新(非评估模式下)
if not (arglist.display or arglist.benchmark):
for agent in trainers:
agent.preupdate()# 准备更新(如清空梯度)
for agent in trainers:
loss = agent.update(trainers, train_step)# 执行MADDPG的Actor-Critic更新
# 8. 定期保存模型
if terminal and (len(episode_rewards) % arglist.save_rate == 0):
U.save_state(arglist.save_dir, saver=saver)
print(f"当前进度: {len(episode_rewards)} episodes, 平均奖励: {np.mean(episode_rewards[-arglist.save_rate:])}")
# 9. 终止条件
if len(episode_rewards) > arglist.num_episodes:
# 保存最终奖励数据
with open(os.path.join(arglist.plots_dir, f"{arglist.exp_name}_rewards.pkl"), 'wb') as f:
pickle.dump(final_ep_rewards, f)
break
</code></pre>
<hr>
<h4 id="关键问题解答对抗者是否生效"><strong>关键问题解答:对抗者是否生效?</strong></h4>
<p>在 <code>simple_tag.py</code> 中定义的 <code>num_adversaries = 3</code> 需要与启动命令中的 <code>--num-adversaries 3</code> 匹配:</p>
<pre><code class="language-bash"># 正确启动命令(必须显式指定)
python train.py --scenario simple_tag --num-adversaries 3 --good-policy maddpg --adv-policy maddpg
</code></pre>
<ul>
<li><strong>若未指定</strong>:训练器会将所有Agent视为合作者,导致对抗者策略错误。</li>
<li><strong>正确指定时</strong>:第一个Agent使用对抗者策略,其余为合作者策略。</li>
</ul>
<hr>
<h3 id="simple_tag文件">simple_tag文件</h3>
<h3 id="1-场景基类与核心定义"><strong>1. 场景基类与核心定义</strong></h3>
<pre><code class="language-python">from multiagent.core import World, Agent, Landmark
from multiagent.scenario import BaseScenario
class Scenario(BaseScenario):
"""
多智能体对抗场景基类,继承自 BaseScenario
核心功能:定义世界属性、智能体行为、奖励机制和观测空间
"""
</code></pre>
<h3 id="2-世界构建方法-make_world"><strong>2. 世界构建方法 <code>make_world</code></strong></h3>
<pre><code class="language-python"> def make_world(self):
world = World()# 创建世界对象
# --- 世界属性设置 ---
world.dim_c = 2# 通信维度(智能体间传递信息的向量长度)
# --- 智能体数量配置 ---
num_good_agents = 1 # 合作者(绿色)数量
num_adversaries = 3 # 对抗者(红色)数量
num_agents = num_adversaries + num_good_agents# 总智能体数 = 3+1=4
num_landmarks = 1 # 地标(障碍物)数量
# --- 初始化智能体 ---
world.agents = # 创建智能体列表
for i, agent in enumerate(world.agents):
agent.name = 'agent %d' % i # 智能体名称(agent 0~3)
agent.collide = True # 是否允许碰撞(True=实体碰撞生效)
agent.silent = True # 是否静默(True=不发送通信信号)
agent.adversary = True if i < num_adversaries else False# 前3个为对抗者
# --- 物理属性 ---
agent.size = 0.075 if agent.adversary else 0.05# 对抗者尺寸稍大
agent.accel = 4.0 # 加速度(控制移动灵敏度的参数,值越大响应越快)
agent.max_speed = 1.3# 最大移动速度(单位:仿真环境坐标系/步)
# !! 注意:以下代码存在问题,会导致覆盖已创建的智能体 !!
# 正确做法应直接修改已存在智能体的属性,而非重新创建
for i in range(num_adversaries):
agent = Agent()# 错误:这里重新创建了新的智能体实例
agent.adversary = True
agent.max_speed = 1.0 + 0.2 * i# 意图差异化速度但未正确实现
agent.accel = 3.0 + 0.5 * i
# --- 地标(障碍物)初始化 ---
world.landmarks =
for i, landmark in enumerate(world.landmarks):
landmark.name = 'landmark %d' % i
landmark.collide = True # 地标是否可碰撞(True=智能体会被阻挡)
landmark.movable = False # 地标是否可移动
landmark.size = 0.2 # 地标尺寸(大于智能体尺寸,形成障碍)
landmark.boundary = False# 是否作为边界(False=普通障碍物)
self.reset_world(world)# 调用重置方法初始化状态
return world
</code></pre>
<h3 id="3-世界重置方法-reset_world"><strong>3. 世界重置方法 <code>reset_world</code></strong></h3>
<pre><code class="language-python"> def reset_world(self, world):
# --- 智能体颜色设置 ---
for i, agent in enumerate(world.agents):
# 对抗者红色 ,合作者绿色
agent.color = np.array() if not agent.adversary else np.array()
# --- 地标颜色设置(灰色)---
for landmark in world.landmarks:
landmark.color = np.array()
# --- 随机初始位置与速度 ---
for agent in world.agents:
agent.state.p_pos = np.random.uniform(-1, +1, world.dim_p)# 位置随机
agent.state.p_vel = np.zeros(world.dim_p)# 初始速度归零
agent.state.c = np.zeros(world.dim_c) # 通信信号归零
# 地标随机位置(边界内)
for landmark in world.landmarks:
if not landmark.boundary:
landmark.state.p_pos = np.random.uniform(-0.9, +0.9, world.dim_p)
landmark.state.p_vel = np.zeros(world.dim_p)
</code></pre>
<hr>
<h3 id="4-奖励函数设计"><strong>4. 奖励函数设计</strong></h3>
<h4 id="合作者奖励-agent_reward"><strong>合作者奖励 <code>agent_reward</code></strong></h4>
<pre><code class="language-python"> def agent_reward(self, agent, world):
rew = 0# 初始化奖励
adversaries = self.adversaries(world)# 获取所有对抗者
# --- 碰撞惩罚 ---
if agent.collide:
for a in adversaries:
if self.is_collision(a, agent):
rew -= 10# 被对抗者碰撞一次扣10分
# --- 边界惩罚 ---
def bound(x):
""" 越界惩罚函数,防止智能体逃离战场 """
if x < 0.9: return 0
if x < 1.0: return (x - 0.9) * 10# 接近边界时线性惩罚
return min(np.exp(2 * x - 2), 10) # 超出边界时指数惩罚
for p in range(world.dim_p):
x = abs(agent.state.p_pos)# 检查每个坐标轴是否越界
rew -= bound(x)
return rew
</code></pre>
<h4 id="对抗者奖励-adversary_reward"><strong>对抗者奖励 <code>adversary_reward</code></strong></h4>
<pre><code class="language-python"> def adversary_reward(self, agent, world):
rew = 0
agents = self.good_agents(world)# 获取合作者(此处只有1个)
# --- 基于距离的奖励塑形(可选)---
if shape:# 当shape=True时启用
for adv in self.adversaries(world):
# 计算与最近合作者的距离,距离越近奖励越高(负值越小)
min_dist = min()
rew -= 0.1 * min_dist
# --- 捕获奖励 ---
if agent.collide:
for ag in agents:
if self.is_collision(ag, agent):
rew += 10# 成功捕获合作者加10分
return rew
</code></pre>
<hr>
<h3 id="5-观测空间构建-observation"><strong>5. 观测空间构建 <code>observation</code></strong></h3>
<pre><code class="language-python"> def observation(self, agent, world):
# --- 实体位置(相对坐标)---
entity_pos = []
for entity in world.landmarks:# 地标位置(障碍物)
if not entity.boundary:
entity_pos.append(entity.state.p_pos - agent.state.p_pos)
# --- 其他智能体信息 ---
comm = [] # 通信信号(本场景未使用)
other_pos = []# 其他智能体相对位置
other_vel = []# 其他智能体速度(仅合作者)
for other in world.agents:
if other is agent: continue# 排除自身
comm.append(other.state.c)
other_pos.append(other.state.p_pos - agent.state.p_pos)
if not other.adversary:# 只记录合作者的速度
other_vel.append(other.state.p_vel)
# 合并观测向量:[自身速度, 自身位置, 地标位置, 其他智能体位置, 合作者速度]
return np.concatenate( + + entity_pos + other_pos + other_vel)
</code></pre>
<h2 id="10效果图后续再补充">10.效果图(后续再补充)</h2>
<blockquote>
<h3 id="实验配置">实验配置:</h3>
<p>6围捕2,障碍物:2</p>
<p>--num-episodes20000 --max-episode-len 25 --lr 5e-5,其余默认</p>
</blockquote>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151320514.png" alt="training_curve" loading="lazy"></p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151320543.png" alt="per_agent_training_curve" loading="lazy"></p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151320816.png" alt="success_rate_curve" loading="lazy"></p>
<p><img src="https://gitee.com/dragonpig7/picgo/raw/master/img/202503151320346.png" alt="safe_output170" loading="lazy"></p>
</div>
<div id="MySignature" role="contentinfo">
<p>作者:七龙猪</p>
<p>出处:{postUrl}</p>
<p>本站使用「CC BY 4.0」创作共享协议,转载请在文章明显位置注明本帅哥及出处。</p><br><br>
来源:https://www.cnblogs.com/7dragonpig/p/-/MPE-Practice-Guide
頁:
[1]