Целочисленная хэш-функция для параллельной реализации карты golang

Я использую параллельную карту из этого репо который использует только строку в качестве ключа и не имеет реализации для ключа как целого числа (int64), поэтому я попытался реализовать его, просто заменив все string в int64 и измените хеш-функцию, как показано ниже. Первоначально получил помощь от ТАК о том, как использовать хеш-функцию, но теперь смотрим обзор кода, чтобы узнать, как сделать ее лучше.

import (
    "encoding/binary"
    "encoding/json"
    "hash/maphash"
    "sync"
)

var SHARD_COUNT = 32
var seed = maphash.MakeSeed()

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap []*ConcurrentMapShared

// A "thread" safe string to anything map.
type ConcurrentMapShared struct {
    items        map[int64]interface{}
    sync.RWMutex // Read Write mutex, guards access to internal map.
}

// Creates a new concurrent map.
func New() ConcurrentMap {
    m := make(ConcurrentMap, SHARD_COUNT)
    for i := 0; i < SHARD_COUNT; i++ {
        m[i] = &ConcurrentMapShared{items: make(map[int64]interface{})}
    }
    return m
}

// GetShard returns shard under given key
// Can this be improved?
func (m ConcurrentMap) GetShard(key int64) *ConcurrentMapShared {
    var h maphash.Hash
    h.SetSeed(seed)
    binary.Write(&h, binary.BigEndian, key)
    return m[h.Sum64()%uint64(SHARD_COUNT)]
}

func (m ConcurrentMap) MSet(data map[int64]interface{}) {
    for key, value := range data {
        shard := m.GetShard(key)
        shard.Lock()
        shard.items[key] = value
        shard.Unlock()
    }
}

// Sets the given value under the specified key.
func (m ConcurrentMap) Set(key int64, value interface{}) {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    shard.items[key] = value
    shard.Unlock()
}

// Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}

// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap) Upsert(key int64, value interface{}, cb UpsertCb) (res interface{}) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}

// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap) SetIfAbsent(key int64, value interface{}) bool {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    _, ok := shard.items[key]
    if !ok {
        shard.items[key] = value
    }
    shard.Unlock()
    return !ok
}

// Get retrieves an element from map under given key.
func (m ConcurrentMap) Get(key int64) (interface{}, bool) {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // Get item from shard.
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}

// Count returns the number of elements within the map.
func (m ConcurrentMap) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}

// Looks up an item under specified key
func (m ConcurrentMap) Has(key int64) bool {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // See if element is within shard.
    _, ok := shard.items[key]
    shard.RUnlock()
    return ok
}

// Remove removes an element from the map.
func (m ConcurrentMap) Remove(key int64) {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    delete(shard.items, key)
    shard.Unlock()
}

// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb func(key int64, v interface{}, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap) RemoveCb(key int64, cb RemoveCb) bool {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    remove := cb(key, v, ok)
    if remove && ok {
        delete(shard.items, key)
    }
    shard.Unlock()
    return remove
}

// Pop removes an element from the map and returns it
func (m ConcurrentMap) Pop(key int64) (v interface{}, exists bool) {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    v, exists = shard.items[key]
    delete(shard.items, key)
    shard.Unlock()
    return v, exists
}

// IsEmpty checks if map is empty.
func (m ConcurrentMap) IsEmpty() bool {
    return m.Count() == 0
}

// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple struct {
    Key int64
    Val interface{}
}

// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap) Iter() <-chan Tuple {
    chans := snapshot(m)
    ch := make(chan Tuple)
    go fanIn(chans, ch)
    return ch
}

// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
    chans := snapshot(m)
    total := 0
    for _, c := range chans {
        total += cap(c)
    }
    ch := make(chan Tuple, total)
    go fanIn(chans, ch)
    return ch
}

// Clear removes all items from map.
func (m ConcurrentMap) Clear() {
    for item := range m.IterBuffered() {
        m.Remove(item.Key)
    }
}

// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
    //When you access map items before initializing.
    if len(m) == 0 {
        panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
    }
    chans = make([]chan Tuple, SHARD_COUNT)
    wg := sync.WaitGroup{}
    wg.Add(SHARD_COUNT)
    // Foreach shard.
    for index, shard := range m {
        go func(index int, shard *ConcurrentMapShared) {
            // Foreach key, value pair.
            shard.RLock()
            chans[index] = make(chan Tuple, len(shard.items))
            wg.Done()
            for key, val := range shard.items {
                chans[index] <- Tuple{key, val}
            }
            shard.RUnlock()
            close(chans[index])
        }(index, shard)
    }
    wg.Wait()
    return chans
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn(chans []chan Tuple, out chan Tuple) {
    wg := sync.WaitGroup{}
    wg.Add(len(chans))
    for _, ch := range chans {
        go func(ch chan Tuple) {
            for t := range ch {
                out <- t
            }
            wg.Done()
        }(ch)
    }
    wg.Wait()
    close(out)
}

// Items returns all items as map[string]interface{}
func (m ConcurrentMap) Items() map[int64]interface{} {
    tmp := make(map[int64]interface{})

    // Insert items to temporary map.
    for item := range m.IterBuffered() {
        tmp[item.Key] = item.Val
    }

    return tmp
}

// Iterator callback,called for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb func(key int64, v interface{})

// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap) IterCb(fn IterCb) {
    for idx := range m {
        shard := (m)[idx]
        shard.RLock()
        for key, value := range shard.items {
            fn(key, value)
        }
        shard.RUnlock()
    }
}

// Keys returns all keys as []string
func (m ConcurrentMap) Keys() []int64 {
    count := m.Count()
    ch := make(chan int64, count)
    go func() {
        // Foreach shard.
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m {
            go func(shard *ConcurrentMapShared) {
                // Foreach key, value pair.
                shard.RLock()
                for key := range shard.items {
                    ch <- key
                }
                shard.RUnlock()
                wg.Done()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    // Generate keys
    keys := make([]int64, 0, count)
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}

// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
    // Create a temporary map, which will hold all item spread across shards.
    tmp := make(map[int64]interface{})

    // Insert items to temporary map.
    for item := range m.IterBuffered() {
        tmp[item.Key] = item.Val
    }
    return json.Marshal(tmp)
}

func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    keyLength := len(key)
    for i := 0; i < keyLength; i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

Здесь суть для приведенного выше кода. Я ищу обзор кода, чтобы увидеть, есть ли какие-либо улучшения, которые мы можем сделать здесь.

  • Специально с GetShard() метод, который я использую maphash.Hash внутри него. Я считаю, что это должно быть инициализировано непосредственно при создании новой карты внутри New() функция, но я не понимаю, как это сделать.
  • Также я глобально объявил var seed = maphash.MakeSeed() который, я думаю, также должен быть инициализирован в New() метод тоже, а затем использовать во всех других методах оттуда.

Я думаю, нам нужна структура для хранения всех этих значений, которые должны быть заполнены из New() метод, но не уверен, что сразу все это сделать.

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *