| // Copyright 2013 The Go Authors. All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file or at |
| // https://developers.google.com/open-source/licenses/bsd. |
| |
| // Redis keys and types: |
| // |
| // maxPackageId string: next id to assign |
| // ids hash maps import path to package id |
| // pkg:<id> hash |
| // terms: space separated search terms |
| // path: import path |
| // synopsis: synopsis |
| // gob: snappy compressed gob encoded doc.Package |
| // score: document search score |
| // etag: |
| // kind: p=package, c=command, d=directory with no go files |
| // index:<term> set: package ids for given search term |
| // index:import:<path> set: packages with import path |
| // index:project:<root> set: packages in project with root |
| // block set: packages to block |
| // popular zset: package id, score |
| // popular:0 string: scaled base time for popular scores |
| // nextCrawl zset: package id, Unix time for next crawl |
| // newCrawl set: new paths to crawl |
| // badCrawl set: paths that returned error when crawling. |
| |
| // Package database manages storage for GoPkgDoc. |
| package database |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/gob" |
| "errors" |
| "fmt" |
| "log" |
| "math" |
| "net/url" |
| "os" |
| "path" |
| "sort" |
| "strconv" |
| "strings" |
| "time" |
| "unicode/utf8" |
| |
| "cloud.google.com/go/trace" |
| "github.com/garyburd/redigo/redis" |
| "github.com/golang/snappy" |
| "golang.org/x/oauth2/google" |
| "google.golang.org/appengine" |
| "google.golang.org/appengine/remote_api" |
| "google.golang.org/appengine/search" |
| |
| "github.com/golang/gddo/doc" |
| "github.com/golang/gddo/gosrc" |
| ) |
| |
| type Database struct { |
| Pool interface { |
| Get() redis.Conn |
| } |
| |
| RemoteClient *remote_api.Client |
| } |
| |
| // Package represents the content of a package both for the search index and |
| // for the HTML template. It implements the search.FieldLoadSaver interface |
| // to customize the Rank function in the search index. |
| type Package struct { |
| Name string `json:"name,omitempty"` |
| Path string `json:"path"` |
| ImportCount int `json:"import_count"` |
| Synopsis string `json:"synopsis,omitempty"` |
| Fork bool `json:"fork,omitempty"` |
| Stars int `json:"stars,omitempty"` |
| Score float64 `json:"score,omitempty"` |
| } |
| |
| type byPath []Package |
| |
| func (p byPath) Len() int { return len(p) } |
| func (p byPath) Less(i, j int) bool { return p[i].Path < p[j].Path } |
| func (p byPath) Swap(i, j int) { p[i], p[j] = p[j], p[i] } |
| |
| // newDBDialer returns a function which returns connections to a redis |
| // database at the given server uri. The server uri should be in the form of: |
| // redis://user:pass@host:port |
| func newDBDialer(server string, logConn bool) func() (c redis.Conn, err error) { |
| return func() (c redis.Conn, err error) { |
| u, err := url.Parse(server) |
| if err != nil { |
| return nil, err |
| } |
| |
| defer func() { |
| if err != nil && c != nil { |
| c.Close() |
| } |
| }() |
| |
| c, err = redis.Dial("tcp", u.Host) |
| if err != nil { |
| return c, err |
| } |
| |
| if logConn { |
| l := log.New(os.Stderr, "", log.LstdFlags) |
| c = redis.NewLoggingConn(c, l, "") |
| } |
| |
| if u.User != nil { |
| if pw, ok := u.User.Password(); ok { |
| if _, err = c.Do("AUTH", pw); err != nil { |
| return c, err |
| } |
| } |
| } |
| return c, err |
| } |
| } |
| |
| func newRemoteClient(host string) (*remote_api.Client, error) { |
| client, err := google.DefaultClient(context.TODO(), |
| "https://www.googleapis.com/auth/appengine.apis", |
| ) |
| if err != nil { |
| return nil, err |
| } |
| client.Transport = trace.Transport{Base: client.Transport} |
| |
| return remote_api.NewClient(host, client) |
| } |
| |
| // New creates a gddo database. serverURI, idleTimeout, and logConn configure |
| // the use of redis. gaeEndpoint is the target of the App Engine remoteapi |
| // endpoint. |
| func New(serverURI string, idleTimeout time.Duration, logConn bool, gaeEndpoint string) (*Database, error) { |
| pool := &redis.Pool{ |
| Dial: newDBDialer(serverURI, logConn), |
| MaxIdle: 10, |
| IdleTimeout: idleTimeout, |
| } |
| |
| var rc *remote_api.Client |
| if gaeEndpoint != "" { |
| var err error |
| if rc, err = newRemoteClient(gaeEndpoint); err != nil { |
| return nil, err |
| } |
| } else { |
| log.Println("remote_api client not setup to use App Engine search") |
| } |
| |
| return &Database{Pool: pool, RemoteClient: rc}, nil |
| } |
| |
| func (db *Database) CheckHealth() error { |
| // TODO(light): get() can trigger a dial. Ideally, the pool could |
| // inform whether or not a lack of connections is due to idleness or |
| // errors. |
| c := db.Pool.Get() |
| err := c.Err() |
| c.Close() |
| return err |
| } |
| |
| // Exists returns true if package with import path exists in the database. |
| func (db *Database) Exists(path string) (bool, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| return redis.Bool(c.Do("HEXISTS", "ids", path)) |
| } |
| |
| var putScript = redis.NewScript(0, ` |
| local path = ARGV[1] |
| local synopsis = ARGV[2] |
| local score = ARGV[3] |
| local gob = ARGV[4] |
| local terms = ARGV[5] |
| local etag = ARGV[6] |
| local kind = ARGV[7] |
| local nextCrawl = ARGV[8] |
| |
| local id = redis.call('HGET', 'ids', path) |
| if not id then |
| id = redis.call('INCR', 'maxPackageId') |
| redis.call('HSET', 'ids', path, id) |
| end |
| |
| if etag ~= '' and etag == redis.call('HGET', 'pkg:' .. id, 'clone') then |
| terms = '' |
| score = 0 |
| end |
| |
| local update = {} |
| for term in string.gmatch(redis.call('HGET', 'pkg:' .. id, 'terms') or '', '([^ ]+)') do |
| update[term] = 1 |
| end |
| |
| for term in string.gmatch(terms, '([^ ]+)') do |
| update[term] = (update[term] or 0) + 2 |
| end |
| |
| for term, x in pairs(update) do |
| if x == 1 then |
| redis.call('SREM', 'index:' .. term, id) |
| elseif x == 2 then |
| redis.call('SADD', 'index:' .. term, id) |
| end |
| end |
| |
| redis.call('SREM', 'badCrawl', path) |
| redis.call('SREM', 'newCrawl', path) |
| |
| if nextCrawl ~= '0' then |
| redis.call('ZADD', 'nextCrawl', nextCrawl, id) |
| redis.call('HSET', 'pkg:' .. id, 'crawl', nextCrawl) |
| end |
| |
| return redis.call('HMSET', 'pkg:' .. id, 'path', path, 'synopsis', synopsis, 'score', score, 'gob', gob, 'terms', terms, 'etag', etag, 'kind', kind) |
| `) |
| |
| var addCrawlScript = redis.NewScript(0, ` |
| for i=1,#ARGV do |
| local pkg = ARGV[i] |
| if redis.call('HEXISTS', 'ids', pkg) == 0 and redis.call('SISMEMBER', 'badCrawl', pkg) == 0 then |
| redis.call('SADD', 'newCrawl', pkg) |
| end |
| end |
| `) |
| |
| func (db *Database) AddNewCrawl(importPath string) error { |
| if !gosrc.IsValidRemotePath(importPath) { |
| return errors.New("bad path") |
| } |
| c := db.Pool.Get() |
| defer c.Close() |
| _, err := addCrawlScript.Do(c, importPath) |
| return err |
| } |
| |
| // Put adds the package documentation to the database. |
| func (db *Database) Put(ctx context.Context, pdoc *doc.Package, nextCrawl time.Time, hide bool) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| |
| score := 0.0 |
| if !hide { |
| score = documentScore(pdoc) |
| } |
| terms := documentTerms(pdoc, score) |
| |
| var gobBuf bytes.Buffer |
| if err := gob.NewEncoder(&gobBuf).Encode(pdoc); err != nil { |
| return err |
| } |
| |
| gobBytes := snappy.Encode(nil, gobBuf.Bytes()) |
| |
| // Truncate large documents. |
| if len(gobBytes) > 1200000 { |
| pdocNew := *pdoc |
| pdoc = &pdocNew |
| pdoc.Truncated = true |
| pdoc.Vars = nil |
| pdoc.Funcs = nil |
| pdoc.Types = nil |
| pdoc.Consts = nil |
| pdoc.Examples = nil |
| gobBuf.Reset() |
| if err := gob.NewEncoder(&gobBuf).Encode(pdoc); err != nil { |
| return err |
| } |
| gobBytes = snappy.Encode(nil, gobBuf.Bytes()) |
| } |
| |
| kind := "p" |
| switch { |
| case pdoc.Name == "": |
| kind = "d" |
| case pdoc.IsCmd: |
| kind = "c" |
| } |
| |
| t := int64(0) |
| if !nextCrawl.IsZero() { |
| t = nextCrawl.Unix() |
| } |
| |
| // Get old version of the package to extract its imports. |
| // If the package does not exist, both oldDoc and err will be nil. |
| old, _, err := db.getDoc(ctx, c, pdoc.ImportPath) |
| if err != nil { |
| return err |
| } |
| |
| _, err = putScript.Do(c, pdoc.ImportPath, pdoc.Synopsis, score, gobBytes, strings.Join(terms, " "), pdoc.Etag, kind, t) |
| if err != nil { |
| return err |
| } |
| |
| id, n, err := pkgIDAndImportCount(c, pdoc.ImportPath) |
| if err != nil { |
| return err |
| } |
| |
| if score > 0 { |
| if err := db.PutIndex(ctx, pdoc, id, score, n); err != nil { |
| log.Printf("Cannot put %q in index: %v", pdoc.ImportPath, err) |
| } |
| |
| if old != nil { |
| if err := db.updateImportsIndex(ctx, c, old, pdoc); err != nil { |
| return err |
| } |
| } |
| } else { |
| if err := db.DeleteIndex(ctx, id); err != nil { |
| return err |
| } |
| } |
| |
| if nextCrawl.IsZero() { |
| // Skip crawling related packages if this is not a full save. |
| return nil |
| } |
| |
| paths := make(map[string]bool) |
| for _, p := range pdoc.Imports { |
| if gosrc.IsValidRemotePath(p) { |
| paths[p] = true |
| } |
| } |
| for _, p := range pdoc.TestImports { |
| if gosrc.IsValidRemotePath(p) { |
| paths[p] = true |
| } |
| } |
| for _, p := range pdoc.XTestImports { |
| if gosrc.IsValidRemotePath(p) { |
| paths[p] = true |
| } |
| } |
| if pdoc.ImportPath != pdoc.ProjectRoot && pdoc.ProjectRoot != "" { |
| paths[pdoc.ProjectRoot] = true |
| } |
| for _, p := range pdoc.Subdirectories { |
| paths[pdoc.ImportPath+"/"+p] = true |
| } |
| |
| args := make([]interface{}, 0, len(paths)) |
| for p := range paths { |
| args = append(args, p) |
| } |
| _, err = addCrawlScript.Do(c, args...) |
| return err |
| } |
| |
| // pkgIDAndImportCount returns the ID and import count of a specified package. |
| func pkgIDAndImportCount(c redis.Conn, path string) (id string, numImported int, err error) { |
| numImported, err = redis.Int(c.Do("SCARD", "index:import:"+path)) |
| if err != nil { |
| return |
| } |
| id, err = redis.String(c.Do("HGET", "ids", path)) |
| if err == redis.ErrNil { |
| return "", 0, nil |
| } else if err != nil { |
| return "", 0, err |
| } |
| return id, numImported, nil |
| } |
| |
| func (db *Database) updateImportsIndex(ctx context.Context, c redis.Conn, oldDoc, newDoc *doc.Package) error { |
| // Create a map to store any import change since last time we indexed the package. |
| changes := make(map[string]bool) |
| for _, p := range oldDoc.Imports { |
| if gosrc.IsValidRemotePath(p) { |
| changes[p] = true |
| } |
| } |
| for _, p := range newDoc.Imports { |
| if gosrc.IsValidRemotePath(p) { |
| delete(changes, p) |
| } |
| } |
| |
| // For each import change, re-index that package with updated NumImported. |
| // In practice this should not happen often and when it does, the changes are |
| // likely to be a small amount. |
| for p := range changes { |
| id, n, err := pkgIDAndImportCount(c, p) |
| if err != nil { |
| log.Println("database.updateImportsIndex:", err) |
| } |
| if id != "" { |
| if err := db.PutIndex(ctx, nil, id, -1, n); err != nil { |
| log.Printf("database.updateImportsIndex: putting package %s into the index: %v\n", p, err) |
| } |
| } |
| } |
| return nil |
| } |
| |
| var setNextCrawlScript = redis.NewScript(0, ` |
| local path = ARGV[1] |
| local nextCrawl = ARGV[2] |
| |
| local id = redis.call('HGET', 'ids', path) |
| if not id then |
| return false |
| end |
| |
| redis.call('ZADD', 'nextCrawl', nextCrawl, id) |
| redis.call('HSET', 'pkg:' .. id, 'crawl', nextCrawl) |
| `) |
| |
| // SetNextCrawl sets the next crawl time for a package. |
| func (db *Database) SetNextCrawl(path string, t time.Time) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| _, err := setNextCrawlScript.Do(c, path, t.Unix()) |
| return err |
| } |
| |
| // bumpCrawlScript sets the crawl time to now. To avoid continuously crawling |
| // frequently updated repositories, the crawl is scheduled in the future. |
| var bumpCrawlScript = redis.NewScript(0, ` |
| local root = ARGV[1] |
| local now = tonumber(ARGV[2]) |
| local nextCrawl = now + 7200 |
| local pkgs = redis.call('SORT', 'index:project:' .. root, 'GET', '#') |
| |
| for i=1,#pkgs do |
| local v = redis.call('HMGET', 'pkg:' .. pkgs[i], 'crawl', 'kind') |
| local t = tonumber(v[1] or 0) |
| if t == 0 or now < t then |
| redis.call('HSET', 'pkg:' .. pkgs[i], 'crawl', now) |
| end |
| local nextCrawl = now + 86400 |
| if v[2] == 'p' then |
| nextCrawl = now + 7200 |
| end |
| t = tonumber(redis.call('ZSCORE', 'nextCrawl', pkgs[i]) or 0) |
| if t == 0 or nextCrawl < t then |
| redis.call('ZADD', 'nextCrawl', nextCrawl, pkgs[i]) |
| end |
| end |
| `) |
| |
| func (db *Database) BumpCrawl(projectRoot string) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| _, err := bumpCrawlScript.Do(c, normalizeProjectRoot(projectRoot), time.Now().Unix()) |
| return err |
| } |
| |
| // getDocScript gets the package documentation and update time for the |
| // specified path. If path is "-", then the oldest document is returned. |
| var getDocScript = redis.NewScript(0, ` |
| local path = ARGV[1] |
| |
| local id |
| if path == '-' then |
| local r = redis.call('ZRANGE', 'nextCrawl', 0, 0) |
| if not r or #r == 0 then |
| return false |
| end |
| id = r[1] |
| else |
| id = redis.call('HGET', 'ids', path) |
| if not id then |
| return false |
| end |
| end |
| |
| local gob = redis.call('HGET', 'pkg:' .. id, 'gob') |
| if not gob then |
| return false |
| end |
| |
| local nextCrawl = redis.call('HGET', 'pkg:' .. id, 'crawl') |
| if not nextCrawl then |
| nextCrawl = redis.call('ZSCORE', 'nextCrawl', id) |
| if not nextCrawl then |
| nextCrawl = 0 |
| end |
| end |
| |
| return {gob, nextCrawl} |
| `) |
| |
| func (db *Database) getDoc(ctx context.Context, c redis.Conn, path string) (*doc.Package, time.Time, error) { |
| r, err := redis.Values(getDocScript.Do(c, path)) |
| if err == redis.ErrNil { |
| return nil, time.Time{}, nil |
| } else if err != nil { |
| return nil, time.Time{}, err |
| } |
| |
| var p []byte |
| var t int64 |
| |
| if _, err := redis.Scan(r, &p, &t); err != nil { |
| return nil, time.Time{}, err |
| } |
| |
| p, err = snappy.Decode(nil, p) |
| if err != nil { |
| return nil, time.Time{}, err |
| } |
| |
| var pdoc doc.Package |
| if err := gob.NewDecoder(bytes.NewReader(p)).Decode(&pdoc); err != nil { |
| return nil, time.Time{}, err |
| } |
| |
| nextCrawl := pdoc.Updated |
| if t != 0 { |
| nextCrawl = time.Unix(t, 0).UTC() |
| } |
| |
| return &pdoc, nextCrawl, err |
| } |
| |
| var getSubdirsScript = redis.NewScript(0, ` |
| local reply |
| for i = 1,#ARGV do |
| reply = redis.call('SORT', 'index:project:' .. ARGV[i], 'ALPHA', 'BY', 'pkg:*->path', 'GET', 'pkg:*->path', 'GET', 'pkg:*->synopsis', 'GET', 'pkg:*->kind') |
| if #reply > 0 then |
| break |
| end |
| end |
| return reply |
| `) |
| |
| func (db *Database) getSubdirs(c redis.Conn, path string, pdoc *doc.Package) ([]Package, error) { |
| var reply interface{} |
| var err error |
| |
| switch { |
| case isStandardPackage(path): |
| reply, err = getSubdirsScript.Do(c, "go") |
| case pdoc != nil: |
| reply, err = getSubdirsScript.Do(c, pdoc.ProjectRoot) |
| default: |
| var roots []interface{} |
| projectRoot := path |
| for i := 0; i < 5; i++ { |
| roots = append(roots, projectRoot) |
| if j := strings.LastIndex(projectRoot, "/"); j < 0 { |
| break |
| } else { |
| projectRoot = projectRoot[:j] |
| } |
| } |
| reply, err = getSubdirsScript.Do(c, roots...) |
| } |
| |
| values, err := redis.Values(reply, err) |
| if err != nil { |
| return nil, err |
| } |
| |
| var subdirs []Package |
| prefix := path + "/" |
| |
| for len(values) > 0 { |
| var pkg Package |
| var kind string |
| values, err = redis.Scan(values, &pkg.Path, &pkg.Synopsis, &kind) |
| if err != nil { |
| return nil, err |
| } |
| if (kind == "p" || kind == "c") && strings.HasPrefix(pkg.Path, prefix) { |
| subdirs = append(subdirs, pkg) |
| } |
| } |
| |
| return subdirs, err |
| } |
| |
| // Get gets the package documentation and sub-directories for the the given |
| // import path. |
| func (db *Database) Get(ctx context.Context, path string) (*doc.Package, []Package, time.Time, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| |
| pdoc, nextCrawl, err := db.getDoc(ctx, c, path) |
| if err != nil { |
| return nil, nil, time.Time{}, err |
| } |
| |
| if pdoc != nil { |
| // fixup for speclal "-" path. |
| path = pdoc.ImportPath |
| } |
| |
| subdirs, err := db.getSubdirs(c, path, pdoc) |
| if err != nil { |
| return nil, nil, time.Time{}, err |
| } |
| return pdoc, subdirs, nextCrawl, nil |
| } |
| |
| func (db *Database) GetDoc(ctx context.Context, path string) (*doc.Package, time.Time, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| return db.getDoc(ctx, c, path) |
| } |
| |
| var deleteScript = redis.NewScript(0, ` |
| local path = ARGV[1] |
| |
| local id = redis.call('HGET', 'ids', path) |
| if not id then |
| return false |
| end |
| |
| for term in string.gmatch(redis.call('HGET', 'pkg:' .. id, 'terms') or '', '([^ ]+)') do |
| redis.call('SREM', 'index:' .. term, id) |
| end |
| |
| redis.call('ZREM', 'nextCrawl', id) |
| redis.call('SREM', 'newCrawl', path) |
| redis.call('ZREM', 'popular', id) |
| redis.call('DEL', 'pkg:' .. id) |
| return redis.call('HDEL', 'ids', path) |
| `) |
| |
| // Delete deletes the documentation for the given import path. |
| func (db *Database) Delete(ctx context.Context, path string) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| |
| id, err := redis.String(c.Do("HGET", "ids", path)) |
| if err == redis.ErrNil { |
| return nil |
| } |
| if err != nil { |
| return err |
| } |
| if err := db.DeleteIndex(ctx, id); err != nil { |
| return err |
| } |
| |
| _, err = deleteScript.Do(c, path) |
| return err |
| } |
| |
| func packages(reply interface{}, all bool) ([]Package, error) { |
| values, err := redis.Values(reply, nil) |
| if err != nil { |
| return nil, err |
| } |
| result := make([]Package, 0, len(values)/3) |
| for len(values) > 0 { |
| var pkg Package |
| var kind string |
| values, err = redis.Scan(values, &pkg.Path, &pkg.Synopsis, &kind) |
| if err != nil { |
| return nil, err |
| } |
| if !all && kind == "d" { |
| continue |
| } |
| if pkg.Path == "C" { |
| pkg.Synopsis = "Package C is a \"pseudo-package\" used to access the C namespace from a cgo source file." |
| } |
| result = append(result, pkg) |
| } |
| return result, nil |
| } |
| |
| func (db *Database) getPackages(key string, all bool) ([]Package, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| reply, err := c.Do("SORT", key, "ALPHA", "BY", "pkg:*->path", "GET", "pkg:*->path", "GET", "pkg:*->synopsis", "GET", "pkg:*->kind") |
| if err != nil { |
| return nil, err |
| } |
| return packages(reply, all) |
| } |
| |
| func (db *Database) GoIndex() ([]Package, error) { |
| return db.getPackages("index:project:go", false) |
| } |
| |
| func (db *Database) GoSubrepoIndex() ([]Package, error) { |
| return db.getPackages("index:project:subrepo", false) |
| } |
| |
| func (db *Database) Index() ([]Package, error) { |
| return db.getPackages("index:all:", false) |
| } |
| |
| func (db *Database) Project(projectRoot string) ([]Package, error) { |
| return db.getPackages("index:project:"+normalizeProjectRoot(projectRoot), true) |
| } |
| |
| func (db *Database) AllPackages() ([]Package, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| values, err := redis.Values(c.Do("SORT", "nextCrawl", "DESC", "BY", "pkg:*->score", "GET", "pkg:*->path", "GET", "pkg:*->kind")) |
| if err != nil { |
| return nil, err |
| } |
| result := make([]Package, 0, len(values)/2) |
| for len(values) > 0 { |
| var pkg Package |
| var kind string |
| values, err = redis.Scan(values, &pkg.Path, &kind) |
| if err != nil { |
| return nil, err |
| } |
| if kind == "d" { |
| continue |
| } |
| result = append(result, pkg) |
| } |
| return result, nil |
| } |
| |
| var packagesScript = redis.NewScript(0, ` |
| local result = {} |
| for i = 1,#ARGV do |
| local path = ARGV[i] |
| local synopsis = '' |
| local kind = 'u' |
| local id = redis.call('HGET', 'ids', path) |
| if id then |
| synopsis = redis.call('HGET', 'pkg:' .. id, 'synopsis') |
| kind = redis.call('HGET', 'pkg:' .. id, 'kind') |
| end |
| result[#result+1] = path |
| result[#result+1] = synopsis |
| result[#result+1] = kind |
| end |
| return result |
| `) |
| |
| func (db *Database) Packages(paths []string) ([]Package, error) { |
| var args []interface{} |
| for _, p := range paths { |
| args = append(args, p) |
| } |
| c := db.Pool.Get() |
| defer c.Close() |
| reply, err := packagesScript.Do(c, args...) |
| if err != nil { |
| return nil, err |
| } |
| pkgs, err := packages(reply, true) |
| sort.Sort(byPath(pkgs)) |
| return pkgs, err |
| } |
| |
| func (db *Database) ImporterCount(path string) (int, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| return redis.Int(c.Do("SCARD", "index:import:"+path)) |
| } |
| |
| func (db *Database) Importers(path string) ([]Package, error) { |
| return db.getPackages("index:import:"+path, false) |
| } |
| |
| // Block puts a domain, repo or package into the block set, removes all the |
| // packages under it from the database and prevents future crawling from it. |
| func (db *Database) Block(root string) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| if _, err := c.Do("SADD", "block", root); err != nil { |
| return err |
| } |
| |
| // Remove all packages under the project root. |
| keys, err := redis.Strings(c.Do("HKEYS", "ids")) |
| if err != nil { |
| return err |
| } |
| ctx := context.Background() |
| for _, key := range keys { |
| if key == root || strings.HasPrefix(key, root) && key[len(root)] == '/' { |
| id, err := redis.String(c.Do("HGET", "ids", key)) |
| if err != nil { |
| return fmt.Errorf("cannot get package id for %s: %v", key, err) |
| } |
| if _, err := deleteScript.Do(c, key); err != nil { |
| return err |
| } |
| if err := db.DeleteIndex(ctx, id); err != nil && err != search.ErrNoSuchDocument { |
| return err |
| } |
| } |
| } |
| |
| // Remove all packages in the newCrawl set under the project root. |
| newCrawls, err := redis.Strings(c.Do("SORT", "newCrawl", "BY", "nosort")) |
| if err != nil { |
| return fmt.Errorf("cannot list newCrawl: %v", err) |
| } |
| for _, nc := range newCrawls { |
| if nc == root || strings.HasPrefix(nc, root) && nc[len(root)] == '/' { |
| if _, err := deleteScript.Do(c, nc); err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| var isBlockedScript = redis.NewScript(0, ` |
| local path = '' |
| for s in string.gmatch(ARGV[1], '[^/]+') do |
| path = path .. s |
| if redis.call('SISMEMBER', 'block', path) == 1 then |
| return 1 |
| end |
| path = path .. '/' |
| end |
| return 0 |
| `) |
| |
| // IsBlocked returns whether the package is blocked or belongs to a blocked |
| // domain/repo. |
| func (db *Database) IsBlocked(path string) (bool, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| return redis.Bool(isBlockedScript.Do(c, path)) |
| } |
| |
| type queryResult struct { |
| Path string |
| Synopsis string |
| Score float64 |
| } |
| |
| type byScore []*queryResult |
| |
| func (p byScore) Len() int { return len(p) } |
| func (p byScore) Less(i, j int) bool { return p[j].Score < p[i].Score } |
| func (p byScore) Swap(i, j int) { p[i], p[j] = p[j], p[i] } |
| |
| func (db *Database) Query(q string) ([]Package, error) { |
| terms := parseQuery(q) |
| if len(terms) == 0 { |
| return nil, nil |
| } |
| c := db.Pool.Get() |
| defer c.Close() |
| n, err := redis.Int(c.Do("INCR", "maxQueryId")) |
| if err != nil { |
| return nil, err |
| } |
| id := "tmp:query-" + strconv.Itoa(n) |
| |
| args := []interface{}{id} |
| for _, term := range terms { |
| args = append(args, "index:"+term) |
| } |
| c.Send("SINTERSTORE", args...) |
| c.Send("SORT", id, "DESC", "BY", "nosort", "GET", "pkg:*->path", "GET", "pkg:*->synopsis", "GET", "pkg:*->score") |
| c.Send("DEL", id) |
| c.Flush() |
| c.Receive() // SINTERSTORE |
| values, err := redis.Values(c.Receive()) // SORT |
| if err != nil { |
| return nil, err |
| } |
| c.Receive() // DEL |
| |
| var queryResults []*queryResult |
| if err := redis.ScanSlice(values, &queryResults, "Path", "Synopsis", "Score"); err != nil { |
| return nil, err |
| } |
| |
| for _, qr := range queryResults { |
| c.Send("SCARD", "index:import:"+qr.Path) |
| } |
| c.Flush() |
| |
| for _, qr := range queryResults { |
| importCount, err := redis.Int(c.Receive()) |
| if err != nil { |
| return nil, err |
| } |
| |
| qr.Score *= math.Log(float64(10 + importCount)) |
| |
| if isStandardPackage(qr.Path) { |
| if strings.HasSuffix(qr.Path, q) { |
| // Big bump for exact match on standard package name. |
| qr.Score *= 10000 |
| } else { |
| qr.Score *= 1.2 |
| } |
| } |
| |
| if q == path.Base(qr.Path) { |
| qr.Score *= 1.1 |
| } |
| } |
| |
| sort.Sort(byScore(queryResults)) |
| |
| pkgs := make([]Package, len(queryResults)) |
| for i, qr := range queryResults { |
| pkgs[i].Path = qr.Path |
| pkgs[i].Synopsis = qr.Synopsis |
| } |
| |
| return pkgs, nil |
| } |
| |
| type PackageInfo struct { |
| PDoc *doc.Package |
| Score float64 |
| Kind string |
| Size int |
| } |
| |
| // Do executes function f for each document in the database. |
| func (db *Database) Do(f func(*PackageInfo) error) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| cursor := 0 |
| c.Send("SCAN", cursor, "MATCH", "pkg:*") |
| c.Flush() |
| for { |
| // Receive previous SCAN. |
| values, err := redis.Values(c.Receive()) |
| if err != nil { |
| return err |
| } |
| var keys [][]byte |
| if _, err := redis.Scan(values, &cursor, &keys); err != nil { |
| return err |
| } |
| if cursor == 0 { |
| break |
| } |
| for _, key := range keys { |
| c.Send("HMGET", key, "gob", "score", "kind", "path", "terms", "synopis") |
| } |
| c.Send("SCAN", cursor, "MATCH", "pkg:*") |
| c.Flush() |
| for _ = range keys { |
| values, err := redis.Values(c.Receive()) |
| if err != nil { |
| return err |
| } |
| |
| var ( |
| pi PackageInfo |
| p []byte |
| path string |
| terms string |
| synopsis string |
| ) |
| |
| if _, err := redis.Scan(values, &p, &pi.Score, &pi.Kind, &path, &terms, &synopsis); err != nil { |
| return err |
| } |
| |
| if p == nil { |
| continue |
| } |
| |
| pi.Size = len(path) + len(p) + len(terms) + len(synopsis) |
| |
| p, err = snappy.Decode(nil, p) |
| if err != nil { |
| return fmt.Errorf("snappy decoding %s: %v", path, err) |
| } |
| |
| if err := gob.NewDecoder(bytes.NewReader(p)).Decode(&pi.PDoc); err != nil { |
| return fmt.Errorf("gob decoding %s: %v", path, err) |
| } |
| if err := f(&pi); err != nil { |
| return fmt.Errorf("func %s: %v", path, err) |
| } |
| } |
| } |
| return nil |
| } |
| |
| var importGraphScript = redis.NewScript(0, ` |
| local path = ARGV[1] |
| |
| local id = redis.call('HGET', 'ids', path) |
| if not id then |
| return false |
| end |
| |
| return redis.call('HMGET', 'pkg:' .. id, 'synopsis', 'terms') |
| `) |
| |
| // DepLevel specifies the level of depdenencies to show in an import graph. |
| type DepLevel int |
| |
| const ( |
| ShowAllDeps DepLevel = iota // show all dependencies |
| HideStandardDeps // don't show dependencies of standard libraries |
| HideStandardAll // don't show standard libraries at all |
| ) |
| |
| func (db *Database) ImportGraph(pdoc *doc.Package, level DepLevel) ([]Package, [][2]int, error) { |
| |
| // This breadth-first traversal of the package's dependencies uses the |
| // Redis pipeline as queue. Links to packages with invalid import paths are |
| // only included for the root package. |
| |
| c := db.Pool.Get() |
| defer c.Close() |
| if err := importGraphScript.Load(c); err != nil { |
| return nil, nil, err |
| } |
| |
| nodes := []Package{{Path: pdoc.ImportPath, Synopsis: pdoc.Synopsis}} |
| edges := [][2]int{} |
| index := map[string]int{pdoc.ImportPath: 0} |
| |
| for _, path := range pdoc.Imports { |
| if level >= HideStandardAll && isStandardPackage(path) { |
| continue |
| } |
| j := len(nodes) |
| index[path] = j |
| edges = append(edges, [2]int{0, j}) |
| nodes = append(nodes, Package{Path: path}) |
| importGraphScript.Send(c, path) |
| } |
| |
| for i := 1; i < len(nodes); i++ { |
| c.Flush() |
| r, err := redis.Values(c.Receive()) |
| if err == redis.ErrNil { |
| continue |
| } else if err != nil { |
| return nil, nil, err |
| } |
| var synopsis, terms string |
| if _, err := redis.Scan(r, &synopsis, &terms); err != nil { |
| return nil, nil, err |
| } |
| nodes[i].Synopsis = synopsis |
| for _, term := range strings.Fields(terms) { |
| if strings.HasPrefix(term, "import:") { |
| path := term[len("import:"):] |
| if level >= HideStandardDeps && isStandardPackage(path) { |
| continue |
| } |
| j, ok := index[path] |
| if !ok { |
| j = len(nodes) |
| index[path] = j |
| nodes = append(nodes, Package{Path: path}) |
| importGraphScript.Send(c, path) |
| } |
| edges = append(edges, [2]int{i, j}) |
| } |
| } |
| } |
| return nodes, edges, nil |
| } |
| |
| func (db *Database) PutGob(key string, value interface{}) error { |
| var buf bytes.Buffer |
| if err := gob.NewEncoder(&buf).Encode(value); err != nil { |
| return err |
| } |
| c := db.Pool.Get() |
| defer c.Close() |
| _, err := c.Do("SET", "gob:"+key, buf.Bytes()) |
| return err |
| } |
| |
| func (db *Database) GetGob(key string, value interface{}) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| p, err := redis.Bytes(c.Do("GET", "gob:"+key)) |
| if err == redis.ErrNil { |
| return nil |
| } else if err != nil { |
| return err |
| } |
| return gob.NewDecoder(bytes.NewReader(p)).Decode(value) |
| } |
| |
| var incrementPopularScoreScript = redis.NewScript(0, ` |
| local path = ARGV[1] |
| local n = ARGV[2] |
| local t = ARGV[3] |
| |
| local id = redis.call('HGET', 'ids', path) |
| if not id then |
| return |
| end |
| |
| local t0 = redis.call('GET', 'popular:0') or '0' |
| local f = math.exp(tonumber(t) - tonumber(t0)) |
| redis.call('ZINCRBY', 'popular', tonumber(n) * f, id) |
| if f > 10 then |
| redis.call('SET', 'popular:0', t) |
| redis.call('ZUNIONSTORE', 'popular', 1, 'popular', 'WEIGHTS', 1.0 / f) |
| redis.call('ZREMRANGEBYSCORE', 'popular', '-inf', 0.05) |
| end |
| `) |
| |
| const popularHalfLife = time.Hour * 24 * 7 |
| |
| func (db *Database) incrementPopularScoreInternal(path string, delta float64, t time.Time) error { |
| // nt = n0 * math.Exp(-lambda * t) |
| // lambda = math.Ln2 / thalf |
| c := db.Pool.Get() |
| defer c.Close() |
| const lambda = math.Ln2 / float64(popularHalfLife) |
| scaledTime := lambda * float64(t.Sub(time.Unix(1257894000, 0))) |
| _, err := incrementPopularScoreScript.Do(c, path, delta, scaledTime) |
| return err |
| } |
| |
| func (db *Database) IncrementPopularScore(path string) error { |
| return db.incrementPopularScoreInternal(path, 1, time.Now()) |
| } |
| |
| var popularScript = redis.NewScript(0, ` |
| local stop = ARGV[1] |
| local ids = redis.call('ZREVRANGE', 'popular', '0', stop) |
| local result = {} |
| for i=1,#ids do |
| local values = redis.call('HMGET', 'pkg:' .. ids[i], 'path', 'synopsis', 'kind') |
| result[#result+1] = values[1] |
| result[#result+1] = values[2] |
| result[#result+1] = values[3] |
| end |
| return result |
| `) |
| |
| func (db *Database) Popular(count int) ([]Package, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| reply, err := popularScript.Do(c, count-1) |
| if err != nil { |
| return nil, err |
| } |
| pkgs, err := packages(reply, false) |
| return pkgs, err |
| } |
| |
| var popularWithScoreScript = redis.NewScript(0, ` |
| local ids = redis.call('ZREVRANGE', 'popular', '0', -1, 'WITHSCORES') |
| local result = {} |
| for i=1,#ids,2 do |
| result[#result+1] = redis.call('HGET', 'pkg:' .. ids[i], 'path') |
| result[#result+1] = ids[i+1] |
| result[#result+1] = 'p' |
| end |
| return result |
| `) |
| |
| func (db *Database) PopularWithScores() ([]Package, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| reply, err := popularWithScoreScript.Do(c) |
| if err != nil { |
| return nil, err |
| } |
| pkgs, err := packages(reply, false) |
| return pkgs, err |
| } |
| |
| func (db *Database) PopNewCrawl() (string, bool, error) { |
| c := db.Pool.Get() |
| defer c.Close() |
| |
| var subdirs []Package |
| |
| path, err := redis.String(c.Do("SPOP", "newCrawl")) |
| switch { |
| case err == redis.ErrNil: |
| err = nil |
| path = "" |
| case err == nil: |
| subdirs, err = db.getSubdirs(c, path, nil) |
| } |
| return path, len(subdirs) > 0, err |
| } |
| |
| func (db *Database) AddBadCrawl(path string) error { |
| c := db.Pool.Get() |
| defer c.Close() |
| _, err := c.Do("SADD", "badCrawl", path) |
| return err |
| } |
| |
| var incrementCounterScript = redis.NewScript(0, ` |
| local key = 'counter:' .. ARGV[1] |
| local n = tonumber(ARGV[2]) |
| local t = tonumber(ARGV[3]) |
| local exp = tonumber(ARGV[4]) |
| |
| local counter = redis.call('GET', key) |
| if counter then |
| counter = cjson.decode(counter) |
| n = n + counter.n * math.exp(counter.t - t) |
| end |
| |
| redis.call('SET', key, cjson.encode({n = n; t = t})) |
| redis.call('EXPIRE', key, exp) |
| return tostring(n) |
| `) |
| |
| const counterHalflife = time.Hour |
| |
| func (db *Database) incrementCounterInternal(key string, delta float64, t time.Time) (float64, error) { |
| // nt = n0 * math.Exp(-lambda * t) |
| // lambda = math.Ln2 / thalf |
| c := db.Pool.Get() |
| defer c.Close() |
| d := strconv.FormatFloat(delta, 'f', -1, 64) |
| const lambda = math.Ln2 / float64(counterHalflife) |
| scaledTime := strconv.FormatFloat(lambda*float64(t.Sub(time.Unix(1257894000, 0))), 'f', -1, 64) |
| return redis.Float64(incrementCounterScript.Do(c, key, d, scaledTime, int((4*counterHalflife)/time.Second))) |
| } |
| |
| func (db *Database) IncrementCounter(key string, delta float64) (float64, error) { |
| return db.incrementCounterInternal(key, delta, time.Now()) |
| } |
| |
| // Reindex gets all the packages in database and put them into the search index. |
| // This will update the search index with the path, synopsis, score, import counts |
| // of all the packages in the database. |
| func (db *Database) Reindex(ctx context.Context) error { |
| if db.RemoteClient == nil { |
| return errors.New("database.Reindex: no App Engine endpoint given") |
| } |
| |
| c := db.Pool.Get() |
| defer c.Close() |
| |
| idx, err := search.Open("packages") |
| if err != nil { |
| return fmt.Errorf("database: failed to open packages: %v", err) |
| } |
| npkgs := 0 |
| for { |
| // Get 200 packages from the nextCrawl set each time. Use npkgs as a cursor |
| // to store the current position we actually indexed. Retry from the cursor |
| // position if we received a timeout error from app engine. |
| values, err := redis.Values(c.Do( |
| "SORT", "nextCrawl", |
| "LIMIT", strconv.Itoa(npkgs), "200", |
| "GET", "pkg:*->path", |
| "GET", "pkg:*->synopsis", |
| "GET", "pkg:*->score", |
| )) |
| if err != nil { |
| return err |
| } |
| if len(values) == 0 { |
| break // all done |
| } |
| |
| // The Search API should support put in batches of up to 200 documents, |
| // the Go version of this API does not support this yet. |
| // TODO(shantuo): Put packages in batch operations. |
| for ; len(values) > 0; npkgs++ { |
| var pdoc doc.Package |
| var score float64 |
| values, err = redis.Scan(values, &pdoc.ImportPath, &pdoc.Synopsis, &score) |
| if err != nil { |
| return err |
| } |
| // There are some corrupted data in our current database |
| // that causes an error when putting the package into the |
| // search index which only supports UTF8 encoding. |
| if !utf8.ValidString(pdoc.Synopsis) { |
| pdoc.Synopsis = "" |
| } |
| id, n, err := pkgIDAndImportCount(c, pdoc.ImportPath) |
| if err != nil { |
| return err |
| } |
| if _, err := idx.Put(db.RemoteClient.NewContext(ctx), id, &Package{ |
| Path: pdoc.ImportPath, |
| Synopsis: pdoc.Synopsis, |
| Score: score, |
| ImportCount: n, |
| }); err != nil { |
| if appengine.IsTimeoutError(err) { |
| log.Printf("App Engine timeout: %v. Continue...", err) |
| break |
| } |
| return fmt.Errorf("Failed to put index %s: %v", id, err) |
| } |
| } |
| } |
| log.Printf("%d packages are reindexed", npkgs) |
| return nil |
| } |
| |
| func (db *Database) Search(ctx context.Context, q string) ([]Package, error) { |
| if db.RemoteClient == nil { |
| return nil, errors.New("remote_api client not setup to use App Engine search") |
| } |
| return searchAE(db.RemoteClient.NewContext(ctx), q) |
| } |
| |
| // PutIndex puts a package into App Engine search index. ID is the package ID in the database. |
| // It is no-op when running locally without setting up remote_api. |
| func (db *Database) PutIndex(ctx context.Context, pdoc *doc.Package, id string, score float64, importCount int) error { |
| if db.RemoteClient == nil { |
| return nil |
| } |
| return putIndex(db.RemoteClient.NewContext(ctx), pdoc, id, score, importCount) |
| } |
| |
| // DeleteIndex deletes a package from App Engine search index. ID is the package ID in the database. |
| // It is no-op when running locally without setting up remote_api. |
| func (db *Database) DeleteIndex(ctx context.Context, id string) error { |
| if db.RemoteClient == nil { |
| return nil |
| } |
| return deleteIndex(db.RemoteClient.NewContext(ctx), id) |
| } |