抓特务 發表於 2021-4-18 08:16:00

rocketmq事务 go 采用rocketmq-client-go的实现

<p>我想用rocketMq大家主要是用它的事务,所以拿着官方的代码体验一下</p>
<h2>环境</h2>
<p>用docker安装rocketMq</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">#需要创建文件夹
</span>/docker/namesrv/<span style="color: rgba(0, 0, 0, 1)">logs
</span>/docker/namesrv/<span style="color: rgba(0, 0, 0, 1)">store
</span>/docker/rocketmq/<span style="color: rgba(0, 0, 0, 1)">logs
</span>/docker/rocketmq/<span style="color: rgba(0, 0, 0, 1)">store

#需要创建文件
</span>/docker/rocketmq/<span style="color: rgba(0, 0, 0, 1)">broker.conf
文件内容如下:
brokerClusterName </span>=<span style="color: rgba(0, 0, 0, 1)"> DefaultCluster
brokerName </span>= broker-<span style="color: rgba(0, 0, 0, 1)">a
brokerId </span>= <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">
deleteWhen </span>= <span style="color: rgba(128, 0, 128, 1)">04</span><span style="color: rgba(0, 0, 0, 1)">
fileReservedTime </span>= <span style="color: rgba(128, 0, 128, 1)">48</span><span style="color: rgba(0, 0, 0, 1)">
brokerRole </span>=<span style="color: rgba(0, 0, 0, 1)"> ASYNC_MASTER
flushDiskType </span>=<span style="color: rgba(0, 0, 0, 1)"> ASYNC_FLUSH
brokerIP1 </span>=<span style="color: rgba(0, 0, 0, 1)"> {本地外网 IP}

#部署并启动nameserver
docker run </span>-d -p <span style="color: rgba(128, 0, 128, 1)">9876</span>:<span style="color: rgba(128, 0, 128, 1)">9876</span> -v /docker/namesrv/logs:/root/logs -v /docker/namesrv/store:/root/store --name rmqnamesrv foxiswho/rocketmq:server-<span style="color: rgba(128, 0, 128, 1)">4.5</span>.<span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">
#部署并启动broker
docker run </span>-d -p <span style="color: rgba(128, 0, 128, 1)">10911</span>:<span style="color: rgba(128, 0, 128, 1)">10911</span> -p <span style="color: rgba(128, 0, 128, 1)">10909</span>:<span style="color: rgba(128, 0, 128, 1)">10909</span> -e TZ=<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Asia/Shanghai</span><span style="color: rgba(128, 0, 0, 1)">"</span> -v/docker/rocketmq/logs:/root/logs -v/docker/rocketmq/store:/root/store -v/docker/rocketmq/broker.conf:/opt/rocketmq-<span style="color: rgba(128, 0, 128, 1)">4.5</span>.<span style="color: rgba(128, 0, 128, 1)">1</span>/conf/broker.conf --name rmqbroker-e <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">NAMESRV_ADDR=192.168.100.30:9876</span><span style="color: rgba(128, 0, 0, 1)">"</span> -e <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">MAX_POSSIBLE_HEAP=200000000</span><span style="color: rgba(128, 0, 0, 1)">"</span> foxiswho/rocketmq:broker-<span style="color: rgba(128, 0, 128, 1)">4.5</span>.<span style="color: rgba(128, 0, 128, 1)">1</span> sh mqbroker -c /opt/rocketmq-<span style="color: rgba(128, 0, 128, 1)">4.5</span>.<span style="color: rgba(128, 0, 128, 1)">1</span>/conf/<span style="color: rgba(0, 0, 0, 1)">broker.conf
#部署并启动console
docker run </span>-d --name rmqconsole -p <span style="color: rgba(128, 0, 128, 1)">8180</span>:<span style="color: rgba(128, 0, 128, 1)">8080</span>-e <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.100.30:9876-Dcom.rocketmq.sendMessageWithVIPChannel=false</span><span style="color: rgba(128, 0, 0, 1)">"</span> -t styletang/rocketmq-console-ng</pre>
</div>
<p>访问宿主机外网 IP:8180 访问console,如下界面不报错表示成功。</p>
<h3>go代码:</h3>
<p>服务端:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main

import (
    </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">context</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">os</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">strconv</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">sync</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">sync/atomic</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span>

    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/apache/rocketmq-client-go/v2</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/apache/rocketmq-client-go/v2/primitive</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/apache/rocketmq-client-go/v2/producer</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)

type DemoListener </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
    localTrans       </span>*<span style="color: rgba(0, 0, 0, 1)">sync.Map
    transactionIndex int32
}

func NewDemoListener() </span>*<span style="color: rgba(0, 0, 0, 1)">DemoListener {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> &amp;<span style="color: rgba(0, 0, 0, 1)">DemoListener{
      localTrans: </span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)">(sync.Map),
    }
}

func (dl </span>*DemoListener) ExecuteLocalTransaction(msg *<span style="color: rgba(0, 0, 0, 1)">primitive.Message) primitive.LocalTransactionState {
    nextIndex :</span>= atomic.AddInt32(&amp;dl.transactionIndex, <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">)
    fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">nextIndex: %v for transactionID: %v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, nextIndex, msg.TransactionId)
    status :</span>= nextIndex % <span style="color: rgba(128, 0, 128, 1)">3</span><span style="color: rgba(0, 0, 0, 1)">
    dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status</span>+<span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">))

    fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">dl</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">在SendMessageInTransaction 方法调用ExecuteLocalTransaction方法,
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">如果ExecuteLocalTransaction 返回primitive.UnknowState 那么brocker就会调用CheckLocalTransaction方法检查消息状态
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 如果返回primitive.CommitMessageState 和primitive.RollbackMessageState 则不会调用CheckLocalTransaction</span>
    <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primitive.UnknowState
}

func (dl </span>*DemoListener) CheckLocalTransaction(msg *<span style="color: rgba(0, 0, 0, 1)">primitive.MessageExt) primitive.LocalTransactionState {
    fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">%v msg transactionID : %v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, time.Now(), msg.TransactionId)
    v, existed :</span>=<span style="color: rgba(0, 0, 0, 1)"> dl.localTrans.Load(msg.TransactionId)
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> !<span style="color: rgba(0, 0, 0, 1)">existed {
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">unknow msg: %v, return Commit</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, msg)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primitive.CommitMessageState
    }
    state :</span>=<span style="color: rgba(0, 0, 0, 1)"> v.(primitive.LocalTransactionState)
    </span><span style="color: rgba(0, 0, 255, 1)">switch</span><span style="color: rgba(0, 0, 0, 1)"> state {
    </span><span style="color: rgba(0, 0, 255, 1)">case</span> <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">:
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">checkLocalTransaction COMMIT_MESSAGE: %v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, msg)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primitive.CommitMessageState
    </span><span style="color: rgba(0, 0, 255, 1)">case</span> <span style="color: rgba(128, 0, 128, 1)">2</span><span style="color: rgba(0, 0, 0, 1)">:
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">checkLocalTransaction ROLLBACK_MESSAGE: %v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, msg)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primitive.RollbackMessageState
    </span><span style="color: rgba(0, 0, 255, 1)">case</span> <span style="color: rgba(128, 0, 128, 1)">3</span><span style="color: rgba(0, 0, 0, 1)">:
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">checkLocalTransaction unknow: %v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, msg)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primitive.UnknowState
    </span><span style="color: rgba(0, 0, 255, 1)">default</span><span style="color: rgba(0, 0, 0, 1)">:
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">checkLocalTransaction default COMMIT_MESSAGE: %v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, msg)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primitive.CommitMessageState
    }
}

func main() {
    p, _ :</span>=<span style="color: rgba(0, 0, 0, 1)"> rocketmq.NewTransactionProducer(
      NewDemoListener(),
      producer.WithNsResolver(primitive.NewPassthroughResolver([]</span><span style="color: rgba(0, 0, 255, 1)">string</span>{<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">192.168.100.30:9876</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">})),
      producer.WithRetry(</span><span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">),
    )
    err :</span>=<span style="color: rgba(0, 0, 0, 1)"> p.Start()
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">start producer error: %s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err.Error())
      os.Exit(</span><span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">)
    }

    topic :</span>= <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">test</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(0, 0, 255, 1)">for</span> i := <span style="color: rgba(128, 0, 128, 1)">0</span>; i &lt; <span style="color: rgba(128, 0, 128, 1)">10</span>; i++<span style="color: rgba(0, 0, 0, 1)"> {
      res, err :</span>=<span style="color: rgba(0, 0, 0, 1)"> p.SendMessageInTransaction(context.Background(),
            primitive.NewMessage(topic, []</span><span style="color: rgba(0, 0, 255, 1)">byte</span>(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Hello RocketMQ again </span><span style="color: rgba(128, 0, 0, 1)">"</span>+<span style="color: rgba(0, 0, 0, 1)">strconv.Itoa(i))))

      </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
            fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">send message error: %s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
      } </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
            fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">send message success: result=%s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, res.String())
      }
    }
    time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">5</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Minute)
    err </span>=<span style="color: rgba(0, 0, 0, 1)"> p.Shutdown()
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">shutdown producer error: %s</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err.Error())
    }
}</span></pre>
</div>
<p>客户端:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main

import (
    </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">context</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">os</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span>

    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/apache/rocketmq-client-go/v2</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/apache/rocketmq-client-go/v2/consumer</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/apache/rocketmq-client-go/v2/primitive</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)

func main() {
    c, _ :</span>=<span style="color: rgba(0, 0, 0, 1)"> rocketmq.NewPushConsumer(
      consumer.WithGroupName(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">testGroup</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">),
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]</span><span style="color: rgba(0, 0, 255, 1)">string</span>{<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">192.168.100.30:9876</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">})),
    )
    err :</span>= c.Subscribe(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">test</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...</span>*<span style="color: rgba(0, 0, 0, 1)">primitive.MessageExt) (consumer.ConsumeResult, error) {
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> i :=<span style="color: rgba(0, 0, 0, 1)"> range msgs {

            fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">subscribe callback: %v \n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, msgs)
      }
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater</span>
      <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> consumer.ConsumeSuccess, nil
    })
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Println(err.Error())
    }
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Note: start after subscribe</span>
    err =<span style="color: rgba(0, 0, 0, 1)"> c.Start()
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Println(err.Error())
      os.Exit(</span>-<span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">)
    }
    time.Sleep(time.Hour)
    err </span>=<span style="color: rgba(0, 0, 0, 1)"> c.Shutdown()
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">shutdown Consumer error: %s</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err.Error())
    }
}</span></pre>
</div>
<p>管理界面随便截个图:</p>
<p><img src="https://img2020.cnblogs.com/blog/209993/202104/209993-20210418081614923-12942639.png"></p>
<p>&nbsp;</p>

</div>
<div id="MySignature" role="contentinfo">
    windows技术爱好者<br><br>
来源:https://www.cnblogs.com/majiang/p/14672799.html
頁: [1]
查看完整版本: rocketmq事务 go 采用rocketmq-client-go的实现