123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- # Copyright (C) 2020 Savoir-Faire Linux
- #
- # SPDX-License-Identifier: GPL-2.0-only
- #
- """
- BitBake 'Fetch' npm implementation
- npm fetcher support the SRC_URI with format of:
- SRC_URI = "npm://some.registry.url;OptionA=xxx;OptionB=xxx;..."
- Supported SRC_URI options are:
- - package
- The npm package name. This is a mandatory parameter.
- - version
- The npm package version. This is a mandatory parameter.
- - downloadfilename
- Specifies the filename used when storing the downloaded file.
- - destsuffix
- Specifies the directory to use to unpack the package (default: npm).
- """
- import base64
- import json
- import os
- import re
- import shlex
- import tempfile
- import bb
- from bb.fetch2 import Fetch
- from bb.fetch2 import FetchError
- from bb.fetch2 import FetchMethod
- from bb.fetch2 import MissingParameterError
- from bb.fetch2 import ParameterError
- from bb.fetch2 import URI
- from bb.fetch2 import check_network_access
- from bb.fetch2 import runfetchcmd
- from bb.utils import is_semver
- def npm_package(package):
- """Convert the npm package name to remove unsupported character"""
- # Scoped package names (with the @) use the same naming convention
- # as the 'npm pack' command.
- if package.startswith("@"):
- return re.sub("/", "-", package[1:])
- return package
- def npm_filename(package, version):
- """Get the filename of a npm package"""
- return npm_package(package) + "-" + version + ".tgz"
- def npm_localfile(package, version):
- """Get the local filename of a npm package"""
- return os.path.join("npm2", npm_filename(package, version))
- def npm_integrity(integrity):
- """
- Get the checksum name and expected value from the subresource integrity
- https://www.w3.org/TR/SRI/
- """
- algo, value = integrity.split("-", maxsplit=1)
- return "%ssum" % algo, base64.b64decode(value).hex()
- def npm_unpack(tarball, destdir, d):
- """Unpack a npm tarball"""
- bb.utils.mkdirhier(destdir)
- cmd = "tar --extract --gzip --file=%s" % shlex.quote(tarball)
- cmd += " --no-same-owner"
- cmd += " --strip-components=1"
- runfetchcmd(cmd, d, workdir=destdir)
- class NpmEnvironment(object):
- """
- Using a npm config file seems more reliable than using cli arguments.
- This class allows to create a controlled environment for npm commands.
- """
- def __init__(self, d, configs=None):
- self.d = d
- self.configs = configs
- def run(self, cmd, args=None, configs=None, workdir=None):
- """Run npm command in a controlled environment"""
- with tempfile.TemporaryDirectory() as tmpdir:
- d = bb.data.createCopy(self.d)
- d.setVar("HOME", tmpdir)
- cfgfile = os.path.join(tmpdir, "npmrc")
- if not workdir:
- workdir = tmpdir
- def _run(cmd):
- cmd = "NPM_CONFIG_USERCONFIG=%s " % cfgfile + cmd
- cmd = "NPM_CONFIG_GLOBALCONFIG=%s " % cfgfile + cmd
- return runfetchcmd(cmd, d, workdir=workdir)
- if self.configs:
- for key, value in self.configs:
- _run("npm config set %s %s" % (key, shlex.quote(value)))
- if configs:
- for key, value in configs:
- _run("npm config set %s %s" % (key, shlex.quote(value)))
- if args:
- for key, value in args:
- cmd += " --%s=%s" % (key, shlex.quote(value))
- return _run(cmd)
- class Npm(FetchMethod):
- """Class to fetch a package from a npm registry"""
- def supports(self, ud, d):
- """Check if a given url can be fetched with npm"""
- return ud.type in ["npm"]
- def urldata_init(self, ud, d):
- """Init npm specific variables within url data"""
- ud.package = None
- ud.version = None
- ud.registry = None
- # Get the 'package' parameter
- if "package" in ud.parm:
- ud.package = ud.parm.get("package")
- if not ud.package:
- raise MissingParameterError("Parameter 'package' required", ud.url)
- # Get the 'version' parameter
- if "version" in ud.parm:
- ud.version = ud.parm.get("version")
- if not ud.version:
- raise MissingParameterError("Parameter 'version' required", ud.url)
- if not is_semver(ud.version) and not ud.version == "latest":
- raise ParameterError("Invalid 'version' parameter", ud.url)
- # Extract the 'registry' part of the url
- ud.registry = re.sub(r"^npm://", "http://", ud.url.split(";")[0])
- # Using the 'downloadfilename' parameter as local filename
- # or the npm package name.
- if "downloadfilename" in ud.parm:
- ud.localfile = d.expand(ud.parm["downloadfilename"])
- else:
- ud.localfile = npm_localfile(ud.package, ud.version)
- # Get the base 'npm' command
- ud.basecmd = d.getVar("FETCHCMD_npm") or "npm"
- # This fetcher resolves a URI from a npm package name and version and
- # then forwards it to a proxy fetcher. A resolve file containing the
- # resolved URI is created to avoid unwanted network access (if the file
- # already exists). The management of the donestamp file, the lockfile
- # and the checksums are forwarded to the proxy fetcher.
- ud.proxy = None
- ud.needdonestamp = False
- ud.resolvefile = self.localpath(ud, d) + ".resolved"
- def _resolve_proxy_url(self, ud, d):
- def _npm_view():
- configs = []
- configs.append(("json", "true"))
- configs.append(("registry", ud.registry))
- pkgver = shlex.quote(ud.package + "@" + ud.version)
- cmd = ud.basecmd + " view %s" % pkgver
- env = NpmEnvironment(d)
- check_network_access(d, cmd, ud.registry)
- view_string = env.run(cmd, configs=configs)
- if not view_string:
- raise FetchError("Unavailable package %s" % pkgver, ud.url)
- try:
- view = json.loads(view_string)
- error = view.get("error")
- if error is not None:
- raise FetchError(error.get("summary"), ud.url)
- if ud.version == "latest":
- bb.warn("The npm package %s is using the latest " \
- "version available. This could lead to " \
- "non-reproducible builds." % pkgver)
- elif ud.version != view.get("version"):
- raise ParameterError("Invalid 'version' parameter", ud.url)
- return view
- except Exception as e:
- raise FetchError("Invalid view from npm: %s" % str(e), ud.url)
- def _get_url(view):
- tarball_url = view.get("dist", {}).get("tarball")
- if tarball_url is None:
- raise FetchError("Invalid 'dist.tarball' in view", ud.url)
- uri = URI(tarball_url)
- uri.params["downloadfilename"] = ud.localfile
- integrity = view.get("dist", {}).get("integrity")
- shasum = view.get("dist", {}).get("shasum")
- if integrity is not None:
- checksum_name, checksum_expected = npm_integrity(integrity)
- uri.params[checksum_name] = checksum_expected
- elif shasum is not None:
- uri.params["sha1sum"] = shasum
- else:
- raise FetchError("Invalid 'dist.integrity' in view", ud.url)
- return str(uri)
- url = _get_url(_npm_view())
- bb.utils.mkdirhier(os.path.dirname(ud.resolvefile))
- with open(ud.resolvefile, "w") as f:
- f.write(url)
- def _setup_proxy(self, ud, d):
- if ud.proxy is None:
- if not os.path.exists(ud.resolvefile):
- self._resolve_proxy_url(ud, d)
- with open(ud.resolvefile, "r") as f:
- url = f.read()
- # Avoid conflicts between the environment data and:
- # - the proxy url checksum
- data = bb.data.createCopy(d)
- data.delVarFlags("SRC_URI")
- ud.proxy = Fetch([url], data)
- def _get_proxy_method(self, ud, d):
- self._setup_proxy(ud, d)
- proxy_url = ud.proxy.urls[0]
- proxy_ud = ud.proxy.ud[proxy_url]
- proxy_d = ud.proxy.d
- proxy_ud.setup_localpath(proxy_d)
- return proxy_ud.method, proxy_ud, proxy_d
- def verify_donestamp(self, ud, d):
- """Verify the donestamp file"""
- proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
- return proxy_m.verify_donestamp(proxy_ud, proxy_d)
- def update_donestamp(self, ud, d):
- """Update the donestamp file"""
- proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
- proxy_m.update_donestamp(proxy_ud, proxy_d)
- def need_update(self, ud, d):
- """Force a fetch, even if localpath exists ?"""
- if not os.path.exists(ud.resolvefile):
- return True
- if ud.version == "latest":
- return True
- proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
- return proxy_m.need_update(proxy_ud, proxy_d)
- def try_mirrors(self, fetch, ud, d, mirrors):
- """Try to use a mirror"""
- proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
- return proxy_m.try_mirrors(fetch, proxy_ud, proxy_d, mirrors)
- def download(self, ud, d):
- """Fetch url"""
- self._setup_proxy(ud, d)
- ud.proxy.download()
- def unpack(self, ud, rootdir, d):
- """Unpack the downloaded archive"""
- destsuffix = ud.parm.get("destsuffix", "npm")
- destdir = os.path.join(rootdir, destsuffix)
- npm_unpack(ud.localpath, destdir, d)
- def clean(self, ud, d):
- """Clean any existing full or partial download"""
- if os.path.exists(ud.resolvefile):
- self._setup_proxy(ud, d)
- ud.proxy.clean()
- bb.utils.remove(ud.resolvefile)
- def done(self, ud, d):
- """Is the download done ?"""
- if not os.path.exists(ud.resolvefile):
- return False
- proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
- return proxy_m.done(proxy_ud, proxy_d)
|