原文:https://juejin.im/post/5cabf6dce51d456e8b07dd03?utm_source=gold_browser_extension

现象

某天晚上,同事正在发布,突然线上大量报警,很多是关于数据库死锁的,报警提示信息如下:

{"errorCode":"SYSTEM_ERROR","errorMsg":"nested exception is org.apache.ibatis.exceptions.PersistenceException: 

Error updating database. Cause: ERR-CODE: [TDDL-4614][ERR_EXECUTE_ON_MYSQL] 

Deadlock found when trying to get lock; 

The error occurred while setting parameters\n### SQL: 

update fund_transfer_stream set gmt_modified=now(),state = ? where fund_transfer_order_no = ? and seller_id = ? and state = 'NEW'

- 阅读剩余部分 -

原文:https://mp.weixin.qq.com/s/piU5jHJywZZq1REglZb76w

An “ins and out” of the internal implementation of the Golang channels and its related operations.

Concurrency in Golang is much more than just syntax.

It a design pattern.

A pattern that is a repeatable solution to a commonly occurring problem while working with concurrency, because even

Concurrency Needs to be Synchronized.

And Go relies on a concurrency model called CSP ( Communicating Sequential Processes), to achieve this pattern of synchronization through Channel. Go core philosophy for concurrency is

Do not communicate by sharing memory; instead, share memory by communicating.

But Go also trusts you to do the right thing. So Rest of the post will try to open this envelope of Go philosophy and how channels — using a queue to achieve the same.

What it takes to be a Channel.

func goRoutineA(a <-chan int) {
    val := <-a
    fmt.Println("goRoutineA received the data", val)
}
func main() {
    ch := make(chan int)
    go goRoutineA(ch)
    time.Sleep(time.Second * 1)
}

1_esw_FDWZXB-3o2gA4buRpA.jpeg

1_cZY2BoVrw7OSpl2-r-g5yA.jpeg

So it’s Responsibility of channel in Go to make the Goroutine runnable again that is blocked on the channel while receiving the data or sending the data.

If you are unfamiliar with Go Scheduler please read this nice introduction about it. https://morsmachine.dk/go-scheduler

Channel Structure

In Go, the “channel structure” is the basis of message passing between Goroutine. So What does this channel structure looks like after we create it?

ch := make(chan int, 3)

1_zjKfZFnvkZ9eZrBw4DYXfg.png

Looks good, good. But what does this really mean? and from where channel gets its structure. Let’s look at a few important structs before going any further.

hchan struct

When we write make(chan int, 2)channel is created from the hchan struct, which has the following fields.

1_LawP02siIUFzBFf9iuoFNw.png

Lets put descriptions to a few fields that we encountered in the channel structure.

dataqsize Is the size of the buffer mentioned above, that is make(chan T, N), the N.

elemsize Is the size of a channel corresponding to a single element.

buf is the circular queue where our data is actually stored. (used only for buffered channel)

closed Indicates whether the current channel is in the closed state. After a channel is created, this field is set to 0, that is, the channel is open; by calling close to set it to 1, the channel is closed.

sendx and recvx is state field of a ring buffer, which indicates the current index of buffer — backing array from where it can send data and receive data.

recvq and sendq waiting queues, which are used to store the blocked goroutines while trying to read data on the channel or while trying to send data from the channel.

lock To lock the channel for each read and write operation as sending and receiving must be mutually exclusive operations.

So what is this sudog?

sudog struct

sudog represent the goroutine.

1_awPburmI8SI_B9I9-320Ww.png

Let’s try to wrap our head around the channel structure again step by step. It’s important to have a clear picture of these as this is what gives channel the power in Go.

1_9Z42PfIoIOc3OIC6plJj7Q.png

What will be the structure of the channel before line No 22?

1_W6AijkK0xecNtW_jvRU79Q.png

Pay attention to highlighted line no 47 and 48 above. Remember recvq description from above

recvq are used to store the blocked goroutines which are trying to read data from the channel.

In Our Example Code before line 22 there are two goroutines (goroutineA and goroutineB ) trying to read data from the channel ch

Since before line 22 on a channel, there is no data we have put on the channel so both the goroutines blocked for receive operation have been wrapped inside the sudog struct and is present on the recvq of the channel.

sudog represent the goroutine.

recvq and sendq are basically linked list, which looks basically as below

1_fiFgoUaJ8nV-SQtEjRv2-w.jpeg

These structures are really Important,

Let’s see what happens when we try to put the data on the channel ch

Send Opertaion Steps c <- x

Underlying types of send Operations on Channel

1. sending on nil channel
1_tRvxKBqmSjhnWAh1X0o2kA.jpeg

If we are sending on the nil channel the current goroutine will suspend its operation.

2. sending on the closed channel.

1_6QwJEx8opvaNzabRtHMa9A (1).jpeg

If we try to send data on the closed channel our goroutine panic.

3. A goroutine is blocked on the channel: the data is sent directly to the goroutine.
1_F16kjMMl6Y9W7IYWNAHXrw.jpeg

This is where recvq structure plays such an important role. If there is any goroutine in the recvq it’s a waiting receiver, and current write operation to channel can directly pass the value to that receiver. Implementation of the send function.

1_HZyaOxBmRRopvrnhrx90VQ.jpeg

Pay attention to the line number 396 goready(gp, skip+1) the Goroutine which was blocked while waiting for the data has been made runnable again by calling goready, and the go scheduler will run the goroutine again.

4. Buffered Channel if there is currently space available for hchan.buf: put the data in the buffer.

1_y9l_mrfI80ga6RXiTezREQ.png

chanbuf(c, i) accesses the corresponding memory area.

Determine if hchan.buf has free space by comparing qcount and dataqsiz. Enqueue the element by copying the area pointed to by the ep pointer to the ring buffer to send, and adjust sendx and qcount.

5. The hchan.buf is full
1_q0fyvZEoGKhFhYDgCtPUDg.png

Create a goroutine object on the current stack

acquireSudog to put the current goroutine in the park state and then add that goroutine in the sendq of the channel.

Send operation Summary

  • lock the entire channel structure.
  • determines writes. Try recvq to take a waiting goroutine from the wait queue, then hand the element to be written directly to the goroutine.
  • If recvq is Empty, Determine whether the buffer is available. If available, copy (typedmemmove copies a value of type t to dst from src.`) the data from current goroutine to the buffer.
    _typedmemmove_ internally uses memmove — memmove() is used to copy a block of memory from a location to another.
  • If the buffer is full then the element to be written is saved in the structure of the currently executing goroutine and the current goroutine is enqueued at sendq and suspended, from runtime.

Point number 4 is really Interesting.

If the buffer is full then the element to be written is saved in the structure of the currently executing goroutine.

Read it again, because this is why the unbuffered channel is actually called “unbuffered” even though the “hchan” struct has the “buf” element associated with it. Because for an unbuffered channel if there is no receiver and if you try to send data, the data will be saved in the elem of the sudog structure. (Holds true for the buffered channel too).

Let me give you an example to clarify the point number 4 in more details. Suppose we have the below code.

1_oqyZV9Ia-MraRGqvK3WaCg.jpeg

What will be the runtime structure of the chan c2 at line number10 ?

1_HuJLQZU-Z-Y1VPMp2e6ixg.png

You can see even though we have put int value 2 on the channel the buf does not have the value, but it will be in the sudog structure of the goroutine. As goroutineA tried to send value over to the channel c2 and there were no receiver ready, so the goroutineA will be added to sendq list of the channel c2 and will be parked as it blocks. We can look into the runtime structure of the blocking sendq to verify.

1_gW6AZ-JGnHomtlW0chTqvA.png

Now that we have overview of send operation on channel what happens once we send an value to our example code above at line 22.

ch <- 3

As recvq of the channel has goroutine in wait state, it will dequeue the first sudog and put the data in that goroutine.

Remember all transfer of value on the go channels happens with the copy of value.

1_h7Gv5MhYHL2qrAkhtFG_NQ.png

What will be the output of the above program ? Just Remember Channel Operates on the copy of value.

So in our case channel will copy the value at g into its buffer.

Don't communicate by sharing memory; share memory by communicating.

Output

&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

1_QVu5G0iXTwA5emKOoKvVxA.jpeg

Receive Opertaion Steps <- ch

Its pretty much the same as the send operations

1_wMt7zNZk8E5S0nKBstGIIA.png

Select

Multiplexing on multiple channel.

1_fKqlJBh2QhVlGtlWd1ImrQ.png

1. Operations are mutually exclusive, so need to acquire the locks on all involved channels in select case, which is done by sorting the cases by Hchan address to get the locking order, so that it does not lock mutexes of all involved channels at once.

sellock(scases, lockorder)

Each scase in the scases array is a struct which contains the kind of operation in the current case and the channel it’s operating on.

1_zVskwl0V0NeSqBFkQ6DYDw.png

kind Is the type of operation in the current case, can be CaseRecv, CaseSend and CaseDefault.

2. Calculate the poll order to shuffle all involved channels to provide the pseudo-random guarantee and traverse all the cases in turn according to the poll order one-by-one to see if any of them is ready for communication. This poll order what makes select operations to not necessarily follow the order declared in the program.

1_-bYEd5Z0SyJlREIgayXmxA.png

1_zgflB5OPpySo9nlJG_PhLQ.png

3. The select statement can return as long as there is a channel operation that doesn’t block, without even need to touch all channels if the selected one is ready.

4. If no channel currently responds and there is no default statement, current g must currently hang on the corresponding wait queue for all channels according to their case.

1_eP36utLSm2q8kBxMBCuWJQ.png

sg.isSelect is what indicates that goroutine is participating in the select statement.

4. Receive, Send and Close operation during Select Operation is similar to the general operation of Receive, Send and Close on channels.

原文:https://blog.csdn.net/Rose1645/article/details/80696144

1.概述

Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsetsopic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offsetopic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafkaopic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。

2.内容

其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClientAPI操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。

在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsetsopic中。

当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的GroupTopic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 GroupTopicPartition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。这也说明offset的提交是以消费者为单位的。通过消费组、主题、与分区就能确定是哪个消费组中的消费者提交的offset的了。

kafka 提供三种语义的传递:

  • 至少一次
  • 至多一次
  • 精确一次

  首先在 producer 端保证12的语义是非常简单的,至少一次只需要同步确认即可(确认方式分为只需要 leader 确认以及所有副本都确认,第二种更加具有容错性),至多一次最简单只需要异步不断的发送即可,效率也比较高。目前在 producer 端还不能保证精确一次,在未来有可能实现,实现方式如下:在同步确认的基础上为每一条消息加一个主键,如果发现主键曾经接受过,则丢弃

  在 consumer 端,大家都知道可以控制 offset,所以可以控制消费,其实 offset 只有在重启的时候才会用到。在机器正常运行时我们用的是 position,我们实时消费的位置也是 position 而不是 offset。我们可以得到每一条消息的 position。如果我们在处理消息之前就将当前消息的 position 保存到 zk 上即 offset,这就是只多一次消费,因为我们可能保存成功后,消息还没有消费机器就挂了,当机器再打开时此消息就丢失了;或者我们可以先消费消息然后保存 positionzk 上即 offset,此时我们就是至少一次,因为我们可能在消费完消息后offset 没有保存成功。而精确一次的做法就是让 position的保存和消息的消费成为原子性操作,比如将消息和 position 同时保存到 hdfs 上 ,此时保存的 position 就称为 offset,当机器重启后,从 hdfs重新读入offset,这就是精确一次。

consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。

精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。
    项目中由于消息可被重复消费、因此采用最少一次这种方式。从kafka集群中获取消息之后,存入数据库后再手动提交offset。这样能保证不会遗漏需要消费的消息。如果要保证精确一次的话可能比较麻烦,就得把offset存入数据库,保证业务逻辑的处理与offset的原子性(同一个事务中)。

最近将 golang 的项目打包放到 docker 环境 运行的时候发现 有很多 ssl x509 的 错误,大概估计了下应该是 https 证书的问题,然后

 docker run -v "/etc/ssl/certs/:/etc/ssl/certs/" 

将宿主机的证书目录映射过去,问题解决。

ps,google 出来的 绝大部分人的解决方法都是设置 TLSClientConfig 来跳过 https 证书验证。这显然是直接能够解决问题的,但是这实际上是避开了解决问题的方法。

元旦回来看下下面的文章,对这个问题也有所解释,有兴趣可以看下:

https://juejin.im/entry/5c29e65951882565986a2484?utm_source=gold_browser_extension