< 返回我的博客

yujinliang 发表于 2020-06-15 09:58

Tags:rust, block chain, p2p, kademlia

  1. https://github.com/cfromknecht/kademlia /findnode.go IterativeFindNode
//for golang
func (k *Kademlia) IterativeFindNode(target NodeID, delta int, final chan Contacts) {
	done := make(chan Contacts)

	ret := make(Contacts, BucketSize) //最终返回给caller的node set
	frontier := make(Contacts, BucketSize)//从未看到过的node set for findnode calling.
	seen := make(map[string]struct{}) //标记已经看到过的node set

	//1. 从本地route table尽可能找出delta个离target最近的node.
	for _, node := range k.routes.FindClosest(target, delta) {
		ret = append(ret, node)
		heap.Push(&frontier, node)
		seen[node.ID.String()] = struct{}{}
	}

	pending := 0
	for i := 0; i < delta && frontier.Len() > 0; i++ {
		//并发异步call findnode for each contact
		pending++
		contact := heap.Pop(&frontier).(Contact)
		go k.FindNode(contact, target, done)
	}

	for pending > 0 {
		//pending累计findnode call count.
		nodes := <-done //注意此处为同步阻塞等待取出远端返回的node set.
		pending--
		for _, node := range nodes {
			if _, ok := seen[node.ID.String()]; !ok {
				//只收集从未看到过的node.
				ret = append(ret, node) //收集之。
				heap.Push(&frontier, node)//之前未看到过的Node用于下次findnode.
				seen[node.ID.String()] = struct{}{} //标记已看到的Node.
			}
		}

		for pending < delta && frontier.Len() > 0 {
			//如果并未达到delta个数且frontier中还有之前从未看到过的Node, then call findnode 
			pending++
			contact := heap.Pop(&frontier).(Contact)
			go k.FindNode(contact, target, done)
		}
	} //注意本算法只是个尽力算法, 并没有满足closet node 原则, 找到的Node只是接近!只是尝试询问delta个node,//就停止继续call findnode,去逼近closet node .

	//通俗地说,首先找出本地离target最近的n个node, 进而同时findnode询问其离target的node set, 收集回复的node set ,并且排除已经看到过的,
	//如果询问的node个数达到delta,则放弃继续逼近closet node, 直接针对ret结果集按距离从小到大排序后返回!
	//delta越大则越有可能逼近找到closet node, 但是也会越耗时, lookup nodes收敛终止的代价越大!
	//原始kademlia paper中这样描述:标记closet node, 每一轮findnode完成结果收集,都去检测closet node是否变化,不变说明逼近成功,停止逼近,否则继续call findnode逼近!
	//delta越小性能越好,但是结果越不准确。

	sort.Sort(ret) //按id distance从小到大排序, 只返回前面最小的BucketSize个。
	if ret.Len() > BucketSize {
		ret = ret[:BucketSize]
	}

	final <- ret
}

kademlia 遵循的原理:熟人扎堆, 万能的朋友圈,圈套圈,总能找到你。

  1. https://github.com/prettymuchbryce/kademlia /dht.go iterate
func (dht *DHT) iterate(t int, target []byte, data []byte) (value []byte, closest []*NetworkNode, err error) {
	sl := dht.ht.getClosestContacts(alpha, target, []*NetworkNode{})

	// We keep track of nodes contacted so far. We don't contact the same node
	// twice.
	//标记已经请求过的Node.
	var contacted = make(map[string]bool)

	// According to the Kademlia white paper, after a round of FIND_NODE RPCs
	// fails to provide a node closer than closestNode, we should send a
	// FIND_NODE RPC to all remaining nodes in the shortlist that have not
	// yet been contacted.
	//标记是否继续请求其余未请求Node.
	queryRest := false

	// We keep a reference to the closestNode. If after performing a search
	// we do not find a closer node, we stop searching.
	//如果本地routetable中没有找到closet contacts, 则无法对外发起请求。
	if len(sl.Nodes) == 0 {
		return nil, nil, nil
	}

	//记录closetNode , 用于检测每一轮findnode之后是否查找到更近的Node。
	closestNode := sl.Nodes[0]

	if t == iterateFindNode {
		bucket := getBucketIndexFromDifferingBit(target, dht.ht.Self.ID)
		dht.ht.resetRefreshTimeForBucket(bucket)
	}

	removeFromShortlist := []*NetworkNode{}

	for {//一次循环代表一轮
		expectedResponses := []*expectedResponse{}
		numExpectedResponses := 0

		// Next we send messages to the first (closest) alpha nodes in the
		// shortlist and wait for a response

		//遍历本地routetable中查找的离target最近的Node集合。
		for i, node := range sl.Nodes {
			// Contact only alpha nodes
			//同时并发异步请求的Node个数不得大于alpha.
			//queryRest 用于标定是否继续请求sl中剩余未请求Node。
			if i >= alpha && !queryRest {
				break
			}

			// Don't contact nodes already contacted
			//排除已经请求过的Node。
			if contacted[string(node.ID)] == true {
				continue
			}

			contacted[string(node.ID)] = true //标记为已经请求过的状态。
			query := &message{}
			query.Sender = dht.ht.Self
			query.Receiver = node

			switch t {
			case iterateFindNode:
				query.Type = messageTypeFindNode
				queryData := &queryDataFindNode{}
				queryData.Target = target
				query.Data = queryData
			case iterateFindValue:
				query.Type = messageTypeFindValue
				queryData := &queryDataFindValue{}
				queryData.Target = target
				query.Data = queryData
			case iterateStore:
				query.Type = messageTypeFindNode
				queryData := &queryDataFindNode{}
				queryData.Target = target
				query.Data = queryData
			default:
				panic("Unknown iterate type")
			}

			// Send the async queries and wait for a response
			//异步并发
			res, err := dht.networking.sendMessage(query, true, -1)
			if err != nil {
				// Node was unreachable for some reason. We will have to remove
				// it from the shortlist, but we will keep it in our routing
				// table in hopes that it might come back online in the future.
				//收集没有响应的Node。
				removeFromShortlist = append(removeFromShortlist, query.Receiver)
				continue
			}
			//收集已发出rpc请求的返回结果,用于随后集中等待收集peer node 响应结果。
			expectedResponses = append(expectedResponses, res)
		}

        //从sl(closet node set)中删除无响应者!切记:并非从route table中删除!因为去中心,分布式网络模式特性,决定了每一个节点随时可能上线或下线。
		//通俗地讲:离自己最近的熟人朋友,也不可能随叫随到!但是不能因为未响应就认定不再是熟人朋友!这是不稳定的!
		for _, n := range removeFromShortlist {
			sl.RemoveNode(n)
		}

		numExpectedResponses = len(expectedResponses) //统计需要等待收集结果的数量。


		resultChan := make(chan (*message)) //将结果统一汇入此channel.
		for _, r := range expectedResponses {
			//依次遍历需要收集结果的rpc 响应集, 并且为每一个rpc response启动一个独立的go 协程监测响应。
			go func(r *expectedResponse) {
				select {
				case result := <-r.ch:
					//每一个rpc 请求成功后都会返回一个channel,用于汇报远端响应结果。
					if result == nil {
						// Channel was closed
						return
					}
					dht.addNode(newNode(result.Sender)) //更新routetable, 认识result.Sender。
					resultChan <- result //向外层继续汇总每一个go 协程的响应结果。
					return
				case <-time.After(dht.options.TMsgTimeout):
					dht.networking.cancelResponse(r) //取消这个已经超时的rpc.
					return
				}
			}(r)
		}

		//同步等待收集最终rpc响应结果。
		var results []*message
		if numExpectedResponses > 0 {
		Loop:
			for {
				select {
				case result := <-resultChan:
					if result != nil {
						results = append(results, result)
					} else {
						numExpectedResponses--
					}
					if len(results) == numExpectedResponses {
						close(resultChan)
						break Loop
					}
				case <-time.After(dht.options.TMsgTimeout):
					close(resultChan)
					break Loop
				}
			}

			for _, result := range results {
				if result.Error != nil {
					sl.RemoveNode(result.Receiver) //去除未响应Node。
					continue
				}
				switch t {
				case iterateFindNode:
					responseData := result.Data.(*responseDataFindNode)
					//将某一个rpc 响应结果收入sl(本地closet node set)
					sl.AppendUniqueNetworkNodes(responseData.Closest)
				case iterateFindValue:
					responseData := result.Data.(*responseDataFindValue)
					// TODO When an iterativeFindValue succeeds, the initiator must
					// store the key/value pair at the closest node seen which did
					// not return the value.
					if responseData.Value != nil {
						return responseData.Value, nil, nil
					}
						//将某一个rpc 响应结果收入sl(本地closet node set)
					sl.AppendUniqueNetworkNodes(responseData.Closest)
				case iterateStore:
					responseData := result.Data.(*responseDataFindNode)
						//将某一个rpc 响应结果收入sl(本地closet node set)
					sl.AppendUniqueNetworkNodes(responseData.Closest)
				}
			}
		}

		//停止逼近closet node , 当queryRest=true && sl(本地closet node set)为空,
		//意味着lookup closet nodes失败了。
		if !queryRest && len(sl.Nodes) == 0 {
			return nil, nil, nil
		}

		sort.Sort(sl) //太重要了,按距离从小到大对sl排序。

		// If closestNode is unchanged then we are done
		//关键!!! 停止逼近closet node的决定性条件
		//即本轮findnode rpc response处理后,监测closet node是否与上一轮不同,相同不变则停止逼近,或queryRest=true时停止逼近。
		if bytes.Compare(sl.Nodes[0].ID, closestNode.ID) == 0 || queryRest {
			// We are done
			switch t {
			case iterateFindNode:
				if !queryRest {
					queryRest = true
					continue
				}
				return nil, sl.Nodes, nil
			case iterateFindValue:
				return nil, sl.Nodes, nil
			case iterateStore:
				//向closet nodes 发送store命令。
				for i, n := range sl.Nodes {
					if i >= k {
						return nil, nil, nil
					}

					query := &message{}
					query.Receiver = n
					query.Sender = dht.ht.Self
					query.Type = messageTypeStore
					queryData := &queryDataStore{}
					queryData.Data = data
					query.Data = queryData
					dht.networking.sendMessage(query, false, -1)
				}
				return nil, nil, nil
			}
		} else {
			//非常关键, 记录本轮findnode rpc 响应中closet node. 每一轮找出一个最近的node全局记录,用于与下一轮找出的closet node比较,
			//相同不变,则停止逼近closet node, 否则继续。
			closestNode = sl.Nodes[0]
		}
	}
}

kademlia paper中说,lookup nodes直到找不出更近的closet node为止, 我想首先kademlia p2p overlay network中每个节点都必须诚实且严格遵守kademlia protocol , 其次kademlia 这种熟人朋友抱团扎堆的喜好,下线越多,lookup nodes失败的可能性越大或者说精度越差; 总结关键在于kademlia node id的合法性, 可否验证身份!不能随便模拟!

万能的朋友圈,但不是全能的!lookup nodes也许成功,也许是失败, 也许找到近似接近closet node。

大家(所有节点)按同一规则扎堆结伴(distance),按同一个规则 (distance)找人, 理论上一定能找到target所在的熟人(朋友)圈, 除非节点不诚实或下线的比例太大。

当探测到一个node节点无反应(下线)时, 是否立即将其删除,加入新节点!若是则网络可用性提高, 但是可靠性和安全性下降,kademlia倾向于不会轻易删除未响应节点, 从而有效避免一定的安全风险!俗话说,好朋友并非天天腻乎在一起!喜新厌旧容易招致不安全。

kademlia挺有人情味的,不轻易抛弃放弃。

  • kademlia attack

underlay network(底层网络):

Spoofing, Eavesdropping, Packet modifications(欺骗、窃听、数据包修改)

Overlay routing: Eclipse attack(日蚀攻击) Sybil attack(梅比尔攻击) Adversarial routing(对抗路由)

Other attacks: Denial‐of‐Service Data Storage

Overlay network must provide end‐to‐end security(覆盖网络必须提供端到端的安全性)

Attacker: Cuts off a part of the network(攻击者:切断网络的一部分)

加入大量不诚实的伪装节点,从而控制overlay network.

导致:Lookups fail, data corruption, partitioning。

对策:Prevent a node from choosing its ID freely(关键在于kademlia node id的合法性, 可否验证身份!不能随便模拟!)

[具体方案A]

Simple solution: Use NodeID := H( IP + Port ) No authentication, problems with NAT(IP地址不固定) IP spoofing still possible(仍然可IP欺骗)


[具体方案B]

Better solution: Cryptographic NodeID NodeID := H( public‐key ) (采用不对称加密技术, 对公钥hash得到NodeID, 好处多多) Allows authentication, key exchange, signing messages(可以验证身份,签名消息)

很显然方案B非常优秀,但是秘钥的生成管理也是个问题,当然由中心化的权威组织发放,则安全最有保障,但是不符合p2p、区块链去中心化的思想;如果私人自己生成秘钥,虽然有效降低了风险,但是仍然存在风险漏洞。

  1. https://github.com/jeffrey-xiao/kademlia-dht-rs /src/node/mod.rs lookup_nodes
    /// Iteratively looks up nodes to determine the closest nodes to `key`. The search begins by
    /// selecting `CONCURRENCY_PARAM` nodes in the routing table and adding it to a shortlist. It
    /// then sends out either `FIND_NODE` or `FIND_VALUE` RPCs to `CONCURRENCY_PARAM` nodes not yet
    /// queried in the shortlist. The node will continue to fill its shortlist until it did not find
    /// a closer node for a round of RPCs or if runs out of nodes to query. Finally, it will query
    /// the remaining nodes in its shortlist until there are no remaining nodes or if it has found
    /// `REPLICATION_PARAM` active nodes.
    fn lookup_nodes(&mut self, key: &Key, find_node: bool) -> ResponsePayload {
        //首先加锁从本地routetable中请求离key最近的Nodes.
        let routing_table = self.routing_table.lock().unwrap();
        let closest_nodes = routing_table.get_closest_nodes(key, CONCURRENCY_PARAM);
        drop(routing_table); //代表立即释放锁。

        //通过比较找出本地closest nodes中最小距离Node。
        let mut closest_distance = Key::new([255u8; KEY_LENGTH]);
        for node_data in &closest_nodes {
            closest_distance = cmp::min(closest_distance, key.xor(&node_data.id))
        }

        // initialize found nodes, queried nodes, and priority queue
        //标记发现看到过的Nodes.
        let mut found_nodes: HashSet<NodeData> = closest_nodes.clone().into_iter().collect();
        found_nodes.insert((*self.node_data).clone());
        //标记已经请求过的Nodes.
        let mut queried_nodes = HashSet::new();
        queried_nodes.insert((*self.node_data).clone());

        //标记待请求优先级队列(按距离)
        let mut queue: BinaryHeap<NodeDataDistancePair> = BinaryHeap::from(
            closest_nodes
                .into_iter()
                .map(|node_data| NodeDataDistancePair(node_data.clone(), node_data.id.xor(key)))
                .collect::<Vec<NodeDataDistancePair>>(),
        );

        let (tx, rx) = channel(); //用于收集findnode rpc response.

        let mut concurrent_thread_count = 0; //用于累计并发异步请求数。

        // spawn initial find requests
        for _ in 0..CONCURRENCY_PARAM {
            if !queue.is_empty() { //开始并发异步findnode rpc.
                self.clone().spawn_find_rpc( 
                    queue.pop().unwrap().0,
                    key.clone(),
                    tx.clone(),
                    find_node,
                );
                concurrent_thread_count += 1;
            }
        }

        // loop until we could not find a closer node for a round or if no threads are running
        while concurrent_thread_count > 0 { //如果>0, 说明需要收集处理findnode rpc reponse.
            //如果发起的并发异步findnode rpc不够数,且待请求优先级队列不空,则继续并发异步请求findnode rpc.
            while concurrent_thread_count < CONCURRENCY_PARAM && !queue.is_empty() {
                self.clone().spawn_find_rpc(
                    queue.pop().unwrap().0,
                    key.clone(),
                    tx.clone(),
                    find_node,
                );
                concurrent_thread_count += 1;
            }

            let mut is_terminated = true; //用于标记是否停止继续逼近closet distance node.
            let response_opt = rx.recv().unwrap(); //同步阻塞等待findnode rpc reponse.
            concurrent_thread_count -= 1; //收到一个请求,意味着一个findnode rpc 完毕,故此减一。

            //具体处理findnode rpc response.
            match response_opt { 
                Some(Response {
                    payload: ResponsePayload::Nodes(nodes),
                    receiver,
                    ..
                }) => {
                    //本case处理远端Node反馈的closet nodes.
                    queried_nodes.insert(receiver); //将此给出响应的Node加入已请求Node集合。
                    for node_data in nodes { //处理此远端Node的closet node 集合(离target key)。
                        let curr_distance = node_data.id.xor(key);

                        if !found_nodes.contains(&node_data) { //如果从未发现看到过,则加入处理, 否则丢弃。
                            if curr_distance < closest_distance {
                                //如果发现更进的距离,即更近的Node, 则记录之。
                                closest_distance = curr_distance;
                                is_terminated = false; //发现比上一轮更近的Node, 意味着逼近需要继续,不要停止。
                            }

                            found_nodes.insert(node_data.clone()); //标记已经发现看到了此Node.
                            let dist = node_data.id.xor(key);
                            let next = NodeDataDistancePair(node_data.clone(), dist);
                            queue.push(next.clone()); //将此Node加入待请求优先级队列。
                        }
                    }
                }
                Some(Response {
                    payload: ResponsePayload::Value(value),
                    ..
                }) => return ResponsePayload::Value(value),
                _ => is_terminated = false, //默认case ,继续逼近closet node.
            }

            if is_terminated { //如果此标志为true, 则停止此循环迭代逼近。
                break;
            }
            debug!("CURRENT CLOSEST DISTANCE IS {:?}", closest_distance);
        }

        //走到此处代表逼近得到closet nodes. 或者没有findnode rpc 需要处理了。
        debug!(
            "{} TERMINATED LOOKUP BECAUSE NOT CLOSER OR NO THREADS WITH DISTANCE {:?}",
            self.node_data.addr, closest_distance,
        );

        //一般来说即使不足REPLICATION_PARAM个数Node,也不必再去逼近,故此下面while块可以忽略。
        // loop until no threads are running or if we found REPLICATION_PARAM active nodes
        while queried_nodes.len() < REPLICATION_PARAM {  //意思是说如果未凑足REPLICATION_PARAM, 则需继续逼近,直到凑足数。
            while concurrent_thread_count < CONCURRENCY_PARAM && !queue.is_empty() {
                self.clone().spawn_find_rpc(
                    queue.pop().unwrap().0,
                    key.clone(),
                    tx.clone(),
                    find_node,
                );
                concurrent_thread_count += 1;
            }
            if concurrent_thread_count == 0 {
                break;
            }

            let response_opt = rx.recv().unwrap();
            concurrent_thread_count -= 1;

            match response_opt {
                Some(Response {
                    payload: ResponsePayload::Nodes(nodes),
                    receiver,
                    ..
                }) => {
                    queried_nodes.insert(receiver);
                    for node_data in nodes {
                        if !found_nodes.contains(&node_data) {
                            found_nodes.insert(node_data.clone());
                            let dist = node_data.id.xor(key);
                            let next = NodeDataDistancePair(node_data.clone(), dist);
                            queue.push(next.clone());
                        }
                    }
                }
                Some(Response {
                    payload: ResponsePayload::Value(value),
                    ..
                }) => return ResponsePayload::Value(value),
                _ => {}
            }
        }

        //对已请求过的node集合按距离从小到大排序,然后返回指定个数Nodes.
        let mut ret: Vec<NodeData> = queried_nodes.into_iter().collect();
        ret.sort_by_key(|node_data| node_data.id.xor(key));
        ret.truncate(REPLICATION_PARAM);
        debug!("{} -  CLOSEST NODES ARE {:#?}", self.node_data.addr, ret);
        ResponsePayload::Nodes(ret)
    }

关切点:

(1) findnode rpc 调用应该实现timeout机制,避免无限期无意义等待

(2) 上面算法实现中,对于未响应的peer node 或rpc error的Node, 应该从本地queried_nodes中删除。

(3) 如果lookup_nodes本身也可以并发调用, 那么不同findnode rpc response如何区分?每一个rpc request and response需要排队串行或者统统附加上target key, and rpc sequence number or ID or Token? 总之需要唯一标记每一个rpc request and respose 与lookup_nodes的对应关系,这需要设计关切和决策。

(4) Kademlia paper及其许多相关设计和实现都建议,采用UDP来管理和维护Kademlia Overlay Network, 这样比价轻便,无状态,代价小。但是RPC request and repose设计实现需要自己考虑lookup_nodes -- findnode request -- findnode response的映射关系


  • 淘汰死节点,加入活节点Update kbucket

When a Kademlia node receives any message from another node, it updates the appropriate kbucket for the sender’s node ID (Kademlia Paper 规定一个Kademlia node 只要收到其他node的任何消息,则update kbucket with sender Node 这个设计这的很巧妙,只在消息收发之间就完成了网络的管理和维护,不需要额外复杂逻辑和处理流程, 及其轻便高效)

[kbucket update algorithm]

if the sender node already exists in the kbucket:

----Moves it to the tail of the list.

else If the bucket has fewer than k entries:

----Inserts the new sender at the tail of the list. //

else

----Pings the kbucket’s least­ recently seen node:

----`If the least­ recently seen node fails to respond:``

--------``it is evicted from the k­bucket and the new sender inserted at the tail.`

----else

--------it is moved to the tail of the list, and the new sender’s contact is discarded.

Kademlia Protocol 规定了必须实现的4个RPC API: PING, FIND_NODE, FIND_VALUE, STOREKademlia规定只要收到其他Node的RPC响应, 必须首先Update kbucket with the sender Node 学习新节点。学习算法为:

如果sender Node已经存在于相应kbucket, 则将此Node移动至对列尾,代表最新节点。

否则如果对应kbucket不满, 则直接将sender Node插入至队列尾。

否则ping一下kbucket队列头最老的Node, 如果失败, 则删除队列头最老Node并将sender Node插入至队列尾。如果ping成功,则丢弃sender Node并将队列头最老Node移动至队列尾(当然也可以将sender Node 放进一个cache中日后备用,不一定非要直接丢弃删除)。


对于lookup_nodes找到的某key的closet node set 我暂且称之为X set , kademlia paper并没有规定要求update kbucket with it, 因为没有必要, 只有当你主动向X set中的Node发消息(RPC call)并获得响应的时候, kademlia Node才会将其存入自己的routetable(kbucket)中。好比现实社会,我认识很多人,如:国家领导人、明星等等, 但是我与他们没有直接交互,所以不需要记录到我的个人通讯录中!只有直接交互过的人,我才可能记录。

此篇kademlia设计文档非常清晰:http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html

https://colobu.com/2018/03/26/distributed-hash-table/


  • 新节点如何Join kademlia network(bootstrap)
  1. 新节点A如果没有自己的node ID, 则生成一个n。
  2. 新节点A必须知道某个引导节点B(又称种子节点, 其实就是kademlia 网络中某个已知节点, 通俗地说就是老人带新人), 并将B加入到kbucket中。
  3. 向B(A目前知道的唯一节点)发起Lookup_Nodes(n)

这种“自我定位”将使得Kademlia network中的其他节点(收到请求的节点)能够使用A的ID填充他们的K-桶,同时也能够使用那些查询过程的中间节点来填充A的K-桶。这个过程既让A获得了详细的路由表,也让其它节点知道了A节点的加入,一举两得!


  • Node ID 如何定义

Kademlia使用160位的哈希算法(比如 SHA1),完整的 ID 用二进制表示有160位,这样可以容纳2的160次方个节点,可以说是不计其数了。Kademlia把 key 映射到一个二叉树,每一个 key 都是这个二叉树的叶子.这是kademlia paper原始定义, 但是并非强制, 具体实现可以自己选择哈希算法和位数,比如SHA256等, 非常灵活!本质: aHash(一个唯一key ) => a ID Number, ID描述方式: (a) 一个数字(需要对应到编程语言的具体数值类型,并且需要注意大小端字节的区别)(b) 一个位数组(位切片, 如rust中的array/slice, 并非数值类型,不用考虑大小端字节)

A ⊕ B => distance M => M's left leading zeros (如: 00001000, 左端4个零) => 0的个数计为i => i- kbucket ,此i即为kbucket的索引, 2i <= distance(A, B) < 2(i+1).

//rust
#[derive(Ord, PartialOrd, PartialEq, Eq, Clone, Hash, Serialize, Deserialize, Default, Copy)]
pub struct Key(pub [u8; KEY_LENGTH]);
//golang
type NodeID [IDLength]byte
//golang
type NetworkNode struct {
	// ID is a 20 byte unique identifier
	ID []byte

	// IP is the IPv4 address of the node
	IP net.IP

	// Port is the port of the node
	Port int
}

sum left leading zeros

//golang
func (node NodeID) Xor(other NodeID) (ret NodeID) {
	for i := 0; i < IDLength; i++ {
		ret[i] = node[i] ^ other[i]
	}
	return
}

func (node NodeID) PrefixLen(other NodeID) int {
	distance := node.Xor(other)
	for i := 0; i < IDLength; i++ {
		for j := 0; j < 8; j++ {
			if (distance[i]>>uint8(7-j))&0x1 != 0 {
				return 8*i + j
			}
		}
	}
	return -1
}
//c/c++
static int common_bits(const unsigned char *id1, const unsigned char *id2)
{
int i, j;
unsigned char xor;
for(i = 0; i < 20; i++) {
  if(id1[i] != id2[i])
      break;
}

if(i == 20)
  return 160;

xor = id1[i] ^ id2[i];

j = 0;
while((xor & 0x80) == 0) {
  xor <<= 1;
  j++;
}

return 8 * i + j;
}

  • Kademlia RPC设计关切点 (并发和异步,UDP)

并发、异步、UDP,这三个核心特性决定了, 需要设计映设机制来保持request <-> response之间的对应关系!因为并发、异步和UDP的无连接特性, 你发你的request,我收我的response,无法保证request和response的对应关系!

(1) 设计思路: 所有并发异步发送的kademlia RPC 请求 都被打入串行队列中,当然会返回给caller一个stub,具体可以是一个channel等, 用于查收RPC response, 以先进先出的方式排队, 尾部入头部出, 每次只有仅有唯一一个kademlia RPC会通过UDP方式发出, 一定时间内收到response则认为收到了响应, 否则超时。这种设计简单粗暴,效率低下(没有并发), 而且易受攻击(因为RPC reponse也许是人家伪造地,没有验明正身)!我认为不可取。

(2) 设计思路:所有kademlia RPC同时并发异步发出,不排队不等待, 只是每一个RPC request中绑定了一个唯一的ID/TOKEN之类的身份牌, 并在本地的某个地方,以形如:map[ request ID : response stub] 存储了用于查收RPC response的stub, 具体可为:map[request ID : a channel]; 独立的listen机制负责接收所有的RPC response, 而远端Node在回复时, 必须为此RPC response绑定好对应的RPC request ID , 然后方可发送,本端收到RPC response 后, 会取出其RPC request ID, 并去本地map中查找, 若找到,则写入此response, 若未找到,则抛弃。

与方案(1)相比效率的确高出很多,唯一瓶颈在于收发都需要读写map[requestID: a channel], 可以考虑采用lock-free 数据结构代替大力度Mutex, 从而避免block。目前也只是想到方案(2)比较可行。

一个例子:https://github.com/jeffrey-xiao/kademlia-dht-rs


  • Reference

https://github.com/kelseyc18/kademlia_vis

http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html

https://github.com/libp2p/rust-libp2p/tree/master/protocols/kad

https://github.com/libp2p/go-libp2p-kad-dht

https://github.com/cfromknecht/kademlia

https://github.com/leejunseok/kademlia-rs

https://github.com/dtantsur/rust-dht

https://github.com/jeffrey-xiao/kademlia-dht-rs

https://colobu.com/2018/03/26/distributed-hash-table/

https://github.com/jech/dht

https://github.com/libp2p/rust-libp2p

https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf

https://blog.csdn.net/hoping/article/details/5307320

https://github.com/jakerachleff/tinytorrent

http://www.scs.stanford.edu/17au-cs244b/labs/projects/kaplan-nelson_ma_rachleff.pdf

https://github.com/jakerachleff/simple-kademlia

https://github.com/DavidKeller/kademlia

https://github.com/arvidn/libtorrent

https://blog.csdn.net/elninowang/article/details/80599908


  • Author

学习随笔,如有谬误,尽请指正,谢谢。

作者:心尘了

email: 285779289@qq.com

git:https://github.com/yujinliang

评论区

写评论
Mike Tang 2020-06-15 10:36

作者 yujinliang 2020-06-15 10:27

我选取例子的标准就是直观简明短小,可以清晰说明kad的原理精髓, 所以没有选取以太坊、ipfs之类的底层p2p来分析!因为其加入了太多设计和抽象, 对于初学者反而不容易学习到kad的精髓!淹没在没有必要的抽象和封装之中!

作者 yujinliang 2020-06-15 10:07

鉴于rust在区块链技术领域的明显优势, 所以在此分享陋文!冒昧!不知合适否, 不好意思!

对以下内容的回复:

作者 yujinliang 2020-06-15 10:04

代码例子包括golang, rust, c等, 主要在于解读每一种lookup_nodes算法实现的收敛方法!学习p2p kad协议的精髓。

1 共 4 条评论, 1 页