Added storage latency histogram metric
Signed-off-by: Alexei Dodon <adodon@cisco.com>
This commit is contained in:
parent
4f825a5e2f
commit
c4d34b7269
@ -1,4 +1,4 @@
|
||||
CONTAINER_RUNTIME := $(shell command -v podman 2> /dev/null || echo docker)
|
||||
CONTAINER_RUNTIME := docker
|
||||
|
||||
.PHONY: binary-container
|
||||
binary-container:
|
||||
|
@ -370,6 +370,49 @@ func TestNewExporter(t *testing.T) {
|
||||
|
||||
So(isChannelDrained(chMetric), ShouldEqual, true)
|
||||
})
|
||||
Convey("Collecting data: Test init value & that observe works on Histogram buckets (lock latency)", func() {
|
||||
// Testing initial value of the histogram counter to be 1 after first observation call
|
||||
latency := getRandomLatency()
|
||||
monitoring.ObserveStorageLockLatency(serverController.Metrics, latency, "/tmp/zot", "RWLock")
|
||||
time.Sleep(SleepTime)
|
||||
|
||||
go func() {
|
||||
// this blocks
|
||||
collector.Collect(chMetric)
|
||||
}()
|
||||
readDefaultMetrics(collector, chMetric)
|
||||
|
||||
pmMetric := <-chMetric
|
||||
So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_storage_lock_latency_seconds_count"].String())
|
||||
|
||||
var metric dto.Metric
|
||||
err := pmMetric.Write(&metric)
|
||||
So(err, ShouldBeNil)
|
||||
So(*metric.Counter.Value, ShouldEqual, 1)
|
||||
|
||||
pmMetric = <-chMetric
|
||||
So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_storage_lock_latency_seconds_sum"].String())
|
||||
|
||||
err = pmMetric.Write(&metric)
|
||||
So(err, ShouldBeNil)
|
||||
So(*metric.Counter.Value, ShouldEqual, latency.Seconds())
|
||||
|
||||
for _, fvalue := range monitoring.GetBuckets("zot.storage.lock.latency.seconds") {
|
||||
pmMetric = <-chMetric
|
||||
So(pmMetric.Desc().String(), ShouldEqual,
|
||||
collector.MetricsDesc["zot_storage_lock_latency_seconds_bucket"].String())
|
||||
|
||||
err = pmMetric.Write(&metric)
|
||||
So(err, ShouldBeNil)
|
||||
if latency.Seconds() < fvalue {
|
||||
So(*metric.Counter.Value, ShouldEqual, 1)
|
||||
} else {
|
||||
So(*metric.Counter.Value, ShouldEqual, 0)
|
||||
}
|
||||
}
|
||||
|
||||
So(isChannelDrained(chMetric), ShouldEqual, true)
|
||||
})
|
||||
Convey("Collecting data: Test init Histogram buckets \n", func() {
|
||||
// Generate a random latency within each bucket and finally test
|
||||
// that "higher" rank bucket counter is incremented by 1
|
||||
|
@ -76,7 +76,7 @@ func (zc Collector) Collect(ch chan<- prometheus.Metric) {
|
||||
zc.MetricsDesc[name], prometheus.CounterValue, h.Sum, h.LabelValues...)
|
||||
|
||||
if h.Buckets != nil {
|
||||
for _, fvalue := range monitoring.GetDefaultBuckets() {
|
||||
for _, fvalue := range monitoring.GetBuckets(h.Name) {
|
||||
var svalue string
|
||||
if fvalue == math.MaxFloat64 {
|
||||
svalue = "+Inf"
|
||||
|
@ -2,7 +2,6 @@ package monitoring
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
@ -15,10 +14,6 @@ type MetricServer interface {
|
||||
IsEnabled() bool
|
||||
}
|
||||
|
||||
func GetDefaultBuckets() []float64 {
|
||||
return []float64{.05, .5, 1, 5, 30, 60, 600, math.MaxFloat64}
|
||||
}
|
||||
|
||||
func getDirSize(path string) (int64, error) {
|
||||
var size int64
|
||||
|
||||
|
@ -74,6 +74,15 @@ var (
|
||||
},
|
||||
[]string{"commit", "binaryType", "goVersion", "version"},
|
||||
)
|
||||
storageLockLatency = promauto.NewHistogramVec( // nolint: gochecknoglobals
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: metricsNamespace,
|
||||
Name: "storage_lock_latency_seconds",
|
||||
Help: "Latency of serving HTTP requests",
|
||||
Buckets: GetStorageLatencyBuckets(),
|
||||
},
|
||||
[]string{"storageName", "lockType"},
|
||||
)
|
||||
)
|
||||
|
||||
type metricServer struct {
|
||||
@ -81,6 +90,14 @@ type metricServer struct {
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func GetDefaultBuckets() []float64 {
|
||||
return []float64{.05, .5, 1, 5, 30, 60, 600}
|
||||
}
|
||||
|
||||
func GetStorageLatencyBuckets() []float64 {
|
||||
return []float64{.001, .01, 0.1, 1, 5, 10, 15, 30, 60}
|
||||
}
|
||||
|
||||
func NewMetricsServer(enabled bool, log log.Logger) MetricServer {
|
||||
return &metricServer{
|
||||
enabled: enabled,
|
||||
@ -174,3 +191,9 @@ func SetServerInfo(ms MetricServer, lvalues ...string) {
|
||||
serverInfo.WithLabelValues(lvalues...).Set(0)
|
||||
})
|
||||
}
|
||||
|
||||
func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageName, lockType string) {
|
||||
ms.SendMetric(func() {
|
||||
storageLockLatency.WithLabelValues(storageName, lockType).Observe(latency.Seconds())
|
||||
})
|
||||
}
|
||||
|
@ -27,7 +27,8 @@ const (
|
||||
// Summary.
|
||||
httpRepoLatencySeconds = metricsNamespace + ".http.repo.latency.seconds"
|
||||
// Histogram.
|
||||
httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds"
|
||||
httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds"
|
||||
storageLockLatencySeconds = metricsNamespace + ".storage.lock.latency.seconds"
|
||||
|
||||
metricsScrapeTimeout = 2 * time.Minute
|
||||
metricsScrapeCheckInterval = 30 * time.Second
|
||||
@ -87,6 +88,14 @@ type HistogramValue struct {
|
||||
LabelValues []string
|
||||
}
|
||||
|
||||
func GetDefaultBuckets() []float64 {
|
||||
return []float64{.05, .5, 1, 5, 30, 60, 600, math.MaxFloat64}
|
||||
}
|
||||
|
||||
func GetStorageLatencyBuckets() []float64 {
|
||||
return []float64{.001, .01, 0.1, 1, 5, 10, 15, 30, 60, math.MaxFloat64}
|
||||
}
|
||||
|
||||
// implements the MetricServer interface.
|
||||
func (ms *metricServer) SendMetric(metric interface{}) {
|
||||
if ms.enabled {
|
||||
@ -172,7 +181,7 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer {
|
||||
// convert to a map for returning easily the string corresponding to a bucket
|
||||
bucketsFloat2String := map[float64]string{}
|
||||
|
||||
for _, fvalue := range GetDefaultBuckets() {
|
||||
for _, fvalue := range append(GetDefaultBuckets(), GetStorageLatencyBuckets()...) {
|
||||
if fvalue == math.MaxFloat64 {
|
||||
bucketsFloat2String[fvalue] = "+Inf"
|
||||
} else {
|
||||
@ -219,7 +228,8 @@ func GetSummaries() map[string][]string {
|
||||
|
||||
func GetHistograms() map[string][]string {
|
||||
return map[string][]string{
|
||||
httpMethodLatencySeconds: {"method"},
|
||||
httpMethodLatencySeconds: {"method"},
|
||||
storageLockLatencySeconds: {"storageName", "lockType"},
|
||||
}
|
||||
}
|
||||
|
||||
@ -366,7 +376,7 @@ func (ms *metricServer) HistogramObserve(hv *HistogramValue) {
|
||||
// The HistogramValue not found: add it
|
||||
buckets := make(map[string]int)
|
||||
|
||||
for _, fvalue := range GetDefaultBuckets() {
|
||||
for _, fvalue := range GetBuckets(hv.Name) {
|
||||
if hv.Sum <= fvalue {
|
||||
buckets[ms.bucketsF2S[fvalue]] = 1
|
||||
} else {
|
||||
@ -381,7 +391,7 @@ func (ms *metricServer) HistogramObserve(hv *HistogramValue) {
|
||||
cachedH := ms.cache.Histograms[index]
|
||||
cachedH.Count++
|
||||
cachedH.Sum += hv.Sum
|
||||
for _, fvalue := range GetDefaultBuckets() {
|
||||
for _, fvalue := range GetBuckets(hv.Name) {
|
||||
if hv.Sum <= fvalue {
|
||||
cachedH.Buckets[ms.bucketsF2S[fvalue]]++
|
||||
}
|
||||
@ -497,6 +507,25 @@ func SetServerInfo(ms MetricServer, lvs ...string) {
|
||||
ms.ForceSendMetric(info)
|
||||
}
|
||||
|
||||
func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageName, lockType string) {
|
||||
h := HistogramValue{
|
||||
Name: storageLockLatencySeconds,
|
||||
Sum: latency.Seconds(), // convenient temporary store for Histogram latency value
|
||||
LabelNames: []string{"storageName", "lockType"},
|
||||
LabelValues: []string{storageName, lockType},
|
||||
}
|
||||
ms.SendMetric(h)
|
||||
}
|
||||
|
||||
func GetMaxIdleScrapeInterval() time.Duration {
|
||||
return metricsScrapeTimeout + metricsScrapeCheckInterval
|
||||
}
|
||||
|
||||
func GetBuckets(metricName string) []float64 {
|
||||
switch metricName {
|
||||
case storageLockLatencySeconds:
|
||||
return GetStorageLatencyBuckets()
|
||||
default:
|
||||
return GetDefaultBuckets()
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// Add s3 support.
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
@ -29,6 +30,11 @@ import (
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
RLOCK = "RLock"
|
||||
RWLOCK = "RWLock"
|
||||
)
|
||||
|
||||
// ObjectStorage provides the image storage operations.
|
||||
type ObjectStorage struct {
|
||||
rootDir string
|
||||
@ -73,23 +79,37 @@ func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger, metric
|
||||
}
|
||||
|
||||
// RLock read-lock.
|
||||
func (is *ObjectStorage) RLock() {
|
||||
func (is *ObjectStorage) RLock(lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
||||
is.lock.RLock()
|
||||
}
|
||||
|
||||
// RUnlock read-unlock.
|
||||
func (is *ObjectStorage) RUnlock() {
|
||||
func (is *ObjectStorage) RUnlock(lockStart *time.Time) {
|
||||
is.lock.RUnlock()
|
||||
|
||||
lockEnd := time.Now()
|
||||
// includes time spent in acquiring and holding a lock
|
||||
latency := lockEnd.Sub(*lockStart)
|
||||
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RLOCK) // histogram
|
||||
}
|
||||
|
||||
// Lock write-lock.
|
||||
func (is *ObjectStorage) Lock() {
|
||||
func (is *ObjectStorage) Lock(lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
||||
is.lock.Lock()
|
||||
}
|
||||
|
||||
// Unlock write-unlock.
|
||||
func (is *ObjectStorage) Unlock() {
|
||||
func (is *ObjectStorage) Unlock(lockStart *time.Time) {
|
||||
is.lock.Unlock()
|
||||
|
||||
lockEnd := time.Now()
|
||||
// includes time spent in acquiring and holding a lock
|
||||
latency := lockEnd.Sub(*lockStart)
|
||||
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RWLOCK) // histogram
|
||||
}
|
||||
|
||||
func (is *ObjectStorage) initRepo(name string) error {
|
||||
@ -143,8 +163,10 @@ func (is *ObjectStorage) initRepo(name string) error {
|
||||
|
||||
// InitRepo creates an image repository under this store.
|
||||
func (is *ObjectStorage) InitRepo(name string) error {
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
return is.initRepo(name)
|
||||
}
|
||||
@ -220,10 +242,12 @@ func (is *ObjectStorage) ValidateRepo(name string) (bool, error) {
|
||||
|
||||
// GetRepositories returns a list of all the repositories under this store.
|
||||
func (is *ObjectStorage) GetRepositories() ([]string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := is.rootDir
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
stores := make([]string, 0)
|
||||
err := is.store.Walk(context.Background(), dir, func(fileInfo driver.FileInfo) error {
|
||||
@ -256,13 +280,15 @@ func (is *ObjectStorage) GetRepositories() ([]string, error) {
|
||||
|
||||
// GetImageTags returns a list of image tags available in the specified repository.
|
||||
func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() {
|
||||
return nil, zerr.ErrRepoNotFound
|
||||
}
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
buf, err := is.GetIndexContent(repo)
|
||||
if err != nil {
|
||||
@ -290,13 +316,15 @@ func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) {
|
||||
|
||||
// GetImageManifest returns the image manifest of an image in the specific repository.
|
||||
func (is *ObjectStorage) GetImageManifest(repo string, reference string) ([]byte, string, string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() {
|
||||
return nil, "", "", zerr.ErrRepoNotFound
|
||||
}
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
buf, err := is.GetIndexContent(repo)
|
||||
if err != nil {
|
||||
@ -422,8 +450,10 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy
|
||||
refIsDigest = true
|
||||
}
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
|
||||
@ -526,6 +556,8 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy
|
||||
|
||||
// DeleteImageManifest deletes the image manifest from the repository.
|
||||
func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) error {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() {
|
||||
return zerr.ErrRepoNotFound
|
||||
@ -541,8 +573,8 @@ func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) erro
|
||||
isTag = true
|
||||
}
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
buf, err := is.GetIndexContent(repo)
|
||||
if err != nil {
|
||||
@ -921,11 +953,8 @@ func (is *ObjectStorage) FullBlobUpload(repo string, body io.Reader, digest stri
|
||||
}
|
||||
|
||||
uuid := u.String()
|
||||
|
||||
src := is.BlobUploadPath(repo, uuid)
|
||||
|
||||
digester := sha256.New()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
_, err = buf.ReadFrom(body)
|
||||
@ -957,8 +986,10 @@ func (is *ObjectStorage) FullBlobUpload(repo string, body io.Reader, digest stri
|
||||
return "", -1, zerr.ErrBadBlobDigest
|
||||
}
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
dst := is.BlobPath(repo, dstDigest)
|
||||
|
||||
@ -995,6 +1026,8 @@ func (is *ObjectStorage) BlobPath(repo string, digest godigest.Digest) string {
|
||||
|
||||
// CheckBlob verifies a blob and returns true if the blob is correct.
|
||||
func (is *ObjectStorage) CheckBlob(repo string, digest string) (bool, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dgst, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
@ -1004,8 +1037,8 @@ func (is *ObjectStorage) CheckBlob(repo string, digest string) (bool, int64, err
|
||||
|
||||
blobPath := is.BlobPath(repo, dgst)
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
binfo, err := is.store.Stat(context.Background(), blobPath)
|
||||
if err != nil {
|
||||
@ -1027,6 +1060,8 @@ func (is *ObjectStorage) CheckBlob(repo string, digest string) (bool, int64, err
|
||||
// GetBlob returns a stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ObjectStorage) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dgst, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
@ -1036,8 +1071,8 @@ func (is *ObjectStorage) GetBlob(repo string, digest string, mediaType string) (
|
||||
|
||||
blobPath := is.BlobPath(repo, dgst)
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
binfo, err := is.store.Stat(context.Background(), blobPath)
|
||||
if err != nil {
|
||||
@ -1093,6 +1128,8 @@ func (is *ObjectStorage) GetIndexContent(repo string) ([]byte, error) {
|
||||
|
||||
// DeleteBlob removes the blob from the repository.
|
||||
func (is *ObjectStorage) DeleteBlob(repo string, digest string) error {
|
||||
var lockLatency time.Time
|
||||
|
||||
dgst, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
@ -1102,8 +1139,8 @@ func (is *ObjectStorage) DeleteBlob(repo string, digest string) error {
|
||||
|
||||
blobPath := is.BlobPath(repo, dgst)
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
_, err = is.store.Stat(context.Background(), blobPath)
|
||||
if err != nil {
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/olekukonko/tablewriter"
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
@ -84,11 +85,12 @@ func checkImage(imageName string, imgStore ImageStore) ([]ScrubImageResult, erro
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
defer oci.Close()
|
||||
|
||||
imgStore.RLock()
|
||||
defer imgStore.RUnlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
imgStore.RLock(&lockLatency)
|
||||
defer imgStore.RUnlock(&lockLatency)
|
||||
|
||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||
if err != nil {
|
||||
|
@ -2,6 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/notaryproject/notation-go-lib"
|
||||
"github.com/opencontainers/go-digest"
|
||||
@ -14,10 +15,10 @@ const (
|
||||
type ImageStore interface {
|
||||
DirExists(d string) bool
|
||||
RootDir() string
|
||||
RLock()
|
||||
RUnlock()
|
||||
Lock()
|
||||
Unlock()
|
||||
RLock(*time.Time)
|
||||
RUnlock(*time.Time)
|
||||
Lock(*time.Time)
|
||||
Unlock(*time.Time)
|
||||
InitRepo(name string) error
|
||||
ValidateRepo(name string) (bool, error)
|
||||
GetRepositories() ([]string, error)
|
||||
|
@ -37,6 +37,8 @@ const (
|
||||
gcDelay = 1 * time.Hour
|
||||
DefaultFilePerms = 0o600
|
||||
DefaultDirPerms = 0o700
|
||||
RLOCK = "RLock"
|
||||
RWLOCK = "RWLock"
|
||||
)
|
||||
|
||||
// BlobUpload models and upload request.
|
||||
@ -142,23 +144,35 @@ func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger, metric
|
||||
}
|
||||
|
||||
// RLock read-lock.
|
||||
func (is *ImageStoreFS) RLock() {
|
||||
func (is *ImageStoreFS) RLock(lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
||||
is.lock.RLock()
|
||||
}
|
||||
|
||||
// RUnlock read-unlock.
|
||||
func (is *ImageStoreFS) RUnlock() {
|
||||
func (is *ImageStoreFS) RUnlock(lockStart *time.Time) {
|
||||
is.lock.RUnlock()
|
||||
|
||||
lockEnd := time.Now()
|
||||
latency := lockEnd.Sub(*lockStart)
|
||||
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RLOCK) // histogram
|
||||
}
|
||||
|
||||
// Lock write-lock.
|
||||
func (is *ImageStoreFS) Lock() {
|
||||
func (is *ImageStoreFS) Lock(lockStart *time.Time) {
|
||||
*lockStart = time.Now()
|
||||
|
||||
is.lock.Lock()
|
||||
}
|
||||
|
||||
// Unlock write-unlock.
|
||||
func (is *ImageStoreFS) Unlock() {
|
||||
func (is *ImageStoreFS) Unlock(lockStart *time.Time) {
|
||||
is.lock.Unlock()
|
||||
|
||||
lockEnd := time.Now()
|
||||
latency := lockEnd.Sub(*lockStart)
|
||||
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RWLOCK) // histogram
|
||||
}
|
||||
|
||||
func (is *ImageStoreFS) initRepo(name string) error {
|
||||
@ -218,8 +232,10 @@ func (is *ImageStoreFS) initRepo(name string) error {
|
||||
|
||||
// InitRepo creates an image repository under this store.
|
||||
func (is *ImageStoreFS) InitRepo(name string) error {
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
return is.initRepo(name)
|
||||
}
|
||||
@ -284,10 +300,12 @@ func (is *ImageStoreFS) ValidateRepo(name string) (bool, error) {
|
||||
|
||||
// GetRepositories returns a list of all the repositories under this store.
|
||||
func (is *ImageStoreFS) GetRepositories() ([]string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := is.rootDir
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
_, err := ioutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
@ -326,13 +344,15 @@ func (is *ImageStoreFS) GetRepositories() ([]string, error) {
|
||||
|
||||
// GetImageTags returns a list of image tags available in the specified repository.
|
||||
func (is *ImageStoreFS) GetImageTags(repo string) ([]string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if !is.DirExists(dir) {
|
||||
return nil, zerr.ErrRepoNotFound
|
||||
}
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||
if err != nil {
|
||||
@ -362,13 +382,15 @@ func (is *ImageStoreFS) GetImageTags(repo string) ([]string, error) {
|
||||
|
||||
// GetImageManifest returns the image manifest of an image in the specific repository.
|
||||
func (is *ImageStoreFS) GetImageManifest(repo string, reference string) ([]byte, string, string, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if !is.DirExists(dir) {
|
||||
return nil, "", "", zerr.ErrRepoNotFound
|
||||
}
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||
if err != nil {
|
||||
@ -514,8 +536,10 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp
|
||||
refIsDigest = true
|
||||
}
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
|
||||
@ -633,6 +657,8 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp
|
||||
|
||||
// DeleteImageManifest deletes the image manifest from the repository.
|
||||
func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if !is.DirExists(dir) {
|
||||
return zerr.ErrRepoNotFound
|
||||
@ -648,8 +674,8 @@ func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error
|
||||
isTag = true
|
||||
}
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||
if err != nil {
|
||||
@ -934,8 +960,10 @@ func (is *ImageStoreFS) FinishBlobUpload(repo string, uuid string, body io.Reade
|
||||
|
||||
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
err = ensureDir(dir, is.log)
|
||||
if err != nil {
|
||||
@ -1014,8 +1042,10 @@ func (is *ImageStoreFS) FullBlobUpload(repo string, body io.Reader, digest strin
|
||||
|
||||
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
var lockLatency time.Time
|
||||
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
_ = ensureDir(dir, is.log)
|
||||
dst := is.BlobPath(repo, dstDigest)
|
||||
@ -1141,6 +1171,8 @@ func (is *ImageStoreFS) BlobPath(repo string, digest godigest.Digest) string {
|
||||
|
||||
// CheckBlob verifies a blob and returns true if the blob is correct.
|
||||
func (is *ImageStoreFS) CheckBlob(repo string, digest string) (bool, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
parsedDigest, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
@ -1151,11 +1183,11 @@ func (is *ImageStoreFS) CheckBlob(repo string, digest string) (bool, int64, erro
|
||||
blobPath := is.BlobPath(repo, parsedDigest)
|
||||
|
||||
if is.dedupe && is.cache != nil {
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
} else {
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
}
|
||||
|
||||
binfo, err := os.Stat(blobPath)
|
||||
@ -1233,6 +1265,8 @@ func (is *ImageStoreFS) copyBlob(repo string, blobPath string, dstRecord string)
|
||||
// GetBlob returns a stream to read the blob.
|
||||
// blob selector instead of directly downloading the blob.
|
||||
func (is *ImageStoreFS) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
parsedDigest, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
@ -1242,8 +1276,8 @@ func (is *ImageStoreFS) GetBlob(repo string, digest string, mediaType string) (i
|
||||
|
||||
blobPath := is.BlobPath(repo, parsedDigest)
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
binfo, err := os.Stat(blobPath)
|
||||
if err != nil {
|
||||
@ -1301,6 +1335,8 @@ func (is *ImageStoreFS) GetIndexContent(repo string) ([]byte, error) {
|
||||
|
||||
// DeleteBlob removes the blob from the repository.
|
||||
func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error {
|
||||
var lockLatency time.Time
|
||||
|
||||
dgst, err := godigest.Parse(digest)
|
||||
if err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
|
||||
@ -1310,8 +1346,8 @@ func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error {
|
||||
|
||||
blobPath := is.BlobPath(repo, dgst)
|
||||
|
||||
is.Lock()
|
||||
defer is.Unlock()
|
||||
is.Lock(&lockLatency)
|
||||
defer is.Unlock(&lockLatency)
|
||||
|
||||
_, err = os.Stat(blobPath)
|
||||
if err != nil {
|
||||
@ -1338,6 +1374,8 @@ func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error {
|
||||
}
|
||||
|
||||
func (is *ImageStoreFS) GetReferrers(repo, digest string, mediaType string) ([]notation.Descriptor, error) {
|
||||
var lockLatency time.Time
|
||||
|
||||
dir := path.Join(is.rootDir, repo)
|
||||
if !is.DirExists(dir) {
|
||||
return nil, zerr.ErrRepoNotFound
|
||||
@ -1350,8 +1388,8 @@ func (is *ImageStoreFS) GetReferrers(repo, digest string, mediaType string) ([]n
|
||||
return nil, zerr.ErrBadBlobDigest
|
||||
}
|
||||
|
||||
is.RLock()
|
||||
defer is.RUnlock()
|
||||
is.RLock(&lockLatency)
|
||||
defer is.RUnlock(&lockLatency)
|
||||
|
||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||
if err != nil {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
// Add s3 support.
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
@ -599,16 +600,18 @@ func TestStorageAPIs(t *testing.T) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
var lockLatency time.Time
|
||||
defer wg.Done()
|
||||
imgStore.Lock()
|
||||
imgStore.Lock(&lockLatency)
|
||||
func() {}()
|
||||
imgStore.Unlock()
|
||||
imgStore.Unlock(&lockLatency)
|
||||
}()
|
||||
go func() {
|
||||
var lockLatency time.Time
|
||||
defer wg.Done()
|
||||
imgStore.RLock()
|
||||
imgStore.RLock(&lockLatency)
|
||||
func() {}()
|
||||
imgStore.RUnlock()
|
||||
imgStore.RUnlock(&lockLatency)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
Loading…
x
Reference in New Issue
Block a user