process.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. #
  2. # BitBake Process based server.
  3. #
  4. # Copyright (C) 2010 Bob Foerster <robert@erafx.com>
  5. #
  6. # SPDX-License-Identifier: GPL-2.0-only
  7. #
  8. """
  9. This module implements a multiprocessing.Process based server for bitbake.
  10. """
  11. import bb
  12. import bb.event
  13. import logging
  14. import multiprocessing
  15. import threading
  16. import array
  17. import os
  18. import sys
  19. import time
  20. import select
  21. import socket
  22. import subprocess
  23. import errno
  24. import re
  25. import datetime
  26. import bb.server.xmlrpcserver
  27. from bb import daemonize
  28. from multiprocessing import queues
  29. logger = logging.getLogger('BitBake')
  30. class ProcessTimeout(SystemExit):
  31. pass
  32. class ProcessServer(multiprocessing.Process):
  33. profile_filename = "profile.log"
  34. profile_processed_filename = "profile.log.processed"
  35. def __init__(self, lock, sock, sockname):
  36. multiprocessing.Process.__init__(self)
  37. self.command_channel = False
  38. self.command_channel_reply = False
  39. self.quit = False
  40. self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
  41. self.next_heartbeat = time.time()
  42. self.event_handle = None
  43. self.haveui = False
  44. self.maxuiwait = 30
  45. self.xmlrpc = False
  46. self._idlefuns = {}
  47. self.bitbake_lock = lock
  48. self.sock = sock
  49. self.sockname = sockname
  50. def register_idle_function(self, function, data):
  51. """Register a function to be called while the server is idle"""
  52. assert hasattr(function, '__call__')
  53. self._idlefuns[function] = data
  54. def run(self):
  55. if self.xmlrpcinterface[0]:
  56. self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
  57. print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
  58. heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
  59. if heartbeat_event:
  60. try:
  61. self.heartbeat_seconds = float(heartbeat_event)
  62. except:
  63. bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
  64. self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
  65. try:
  66. if self.timeout:
  67. self.timeout = float(self.timeout)
  68. except:
  69. bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
  70. try:
  71. self.bitbake_lock.seek(0)
  72. self.bitbake_lock.truncate()
  73. if self.xmlrpc:
  74. self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
  75. else:
  76. self.bitbake_lock.write("%s\n" % (os.getpid()))
  77. self.bitbake_lock.flush()
  78. except Exception as e:
  79. print("Error writing to lock file: %s" % str(e))
  80. pass
  81. if self.cooker.configuration.profile:
  82. try:
  83. import cProfile as profile
  84. except:
  85. import profile
  86. prof = profile.Profile()
  87. ret = profile.Profile.runcall(prof, self.main)
  88. prof.dump_stats("profile.log")
  89. bb.utils.process_profilelog("profile.log")
  90. print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
  91. else:
  92. ret = self.main()
  93. return ret
  94. def main(self):
  95. self.cooker.pre_serve()
  96. bb.utils.set_process_name("Cooker")
  97. ready = []
  98. newconnections = []
  99. self.controllersock = False
  100. fds = [self.sock]
  101. if self.xmlrpc:
  102. fds.append(self.xmlrpc)
  103. print("Entering server connection loop")
  104. def disconnect_client(self, fds):
  105. print("Disconnecting Client")
  106. if self.controllersock:
  107. fds.remove(self.controllersock)
  108. self.controllersock.close()
  109. self.controllersock = False
  110. if self.haveui:
  111. fds.remove(self.command_channel)
  112. bb.event.unregister_UIHhandler(self.event_handle, True)
  113. self.command_channel_reply.writer.close()
  114. self.event_writer.writer.close()
  115. self.command_channel.close()
  116. self.command_channel = False
  117. del self.event_writer
  118. self.lastui = time.time()
  119. self.cooker.clientComplete()
  120. self.haveui = False
  121. ready = select.select(fds,[],[],0)[0]
  122. if newconnections:
  123. print("Starting new client")
  124. conn = newconnections.pop(-1)
  125. fds.append(conn)
  126. self.controllersock = conn
  127. elif self.timeout is None and not ready:
  128. print("No timeout, exiting.")
  129. self.quit = True
  130. self.lastui = time.time()
  131. while not self.quit:
  132. if self.sock in ready:
  133. while select.select([self.sock],[],[],0)[0]:
  134. controllersock, address = self.sock.accept()
  135. if self.controllersock:
  136. print("Queuing %s (%s)" % (str(ready), str(newconnections)))
  137. newconnections.append(controllersock)
  138. else:
  139. print("Accepting %s (%s)" % (str(ready), str(newconnections)))
  140. self.controllersock = controllersock
  141. fds.append(controllersock)
  142. if self.controllersock in ready:
  143. try:
  144. print("Processing Client")
  145. ui_fds = recvfds(self.controllersock, 3)
  146. print("Connecting Client")
  147. # Where to write events to
  148. writer = ConnectionWriter(ui_fds[0])
  149. self.event_handle = bb.event.register_UIHhandler(writer, True)
  150. self.event_writer = writer
  151. # Where to read commands from
  152. reader = ConnectionReader(ui_fds[1])
  153. fds.append(reader)
  154. self.command_channel = reader
  155. # Where to send command return values to
  156. writer = ConnectionWriter(ui_fds[2])
  157. self.command_channel_reply = writer
  158. self.haveui = True
  159. except (EOFError, OSError):
  160. disconnect_client(self, fds)
  161. if not self.timeout == -1.0 and not self.haveui and self.timeout and \
  162. (self.lastui + self.timeout) < time.time():
  163. print("Server timeout, exiting.")
  164. self.quit = True
  165. # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
  166. # one. We have had issue with processes hanging indefinitely so timing out UI-less
  167. # servers is useful.
  168. if not self.haveui and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
  169. print("No UI connection within max timeout, exiting to avoid infinite loop.")
  170. self.quit = True
  171. if self.command_channel in ready:
  172. try:
  173. command = self.command_channel.get()
  174. except EOFError:
  175. # Client connection shutting down
  176. ready = []
  177. disconnect_client(self, fds)
  178. continue
  179. if command[0] == "terminateServer":
  180. self.quit = True
  181. continue
  182. try:
  183. print("Running command %s" % command)
  184. self.command_channel_reply.send(self.cooker.command.runCommand(command))
  185. except Exception as e:
  186. logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
  187. if self.xmlrpc in ready:
  188. self.xmlrpc.handle_requests()
  189. ready = self.idle_commands(.1, fds)
  190. print("Exiting")
  191. # Remove the socket file so we don't get any more connections to avoid races
  192. try:
  193. os.unlink(self.sockname)
  194. except:
  195. pass
  196. self.sock.close()
  197. try:
  198. self.cooker.shutdown(True)
  199. self.cooker.notifier.stop()
  200. self.cooker.confignotifier.stop()
  201. except:
  202. pass
  203. self.cooker.post_serve()
  204. # Finally release the lockfile but warn about other processes holding it open
  205. lock = self.bitbake_lock
  206. lockfile = lock.name
  207. lock.close()
  208. lock = None
  209. while not lock:
  210. with bb.utils.timeout(3):
  211. lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
  212. if lock:
  213. # We hold the lock so we can remove the file (hide stale pid data)
  214. # via unlockfile.
  215. bb.utils.unlockfile(lock)
  216. return
  217. if not lock:
  218. # Some systems may not have lsof available
  219. procs = None
  220. try:
  221. procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
  222. except OSError as e:
  223. if e.errno != errno.ENOENT:
  224. raise
  225. if procs is None:
  226. # Fall back to fuser if lsof is unavailable
  227. try:
  228. procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
  229. except OSError as e:
  230. if e.errno != errno.ENOENT:
  231. raise
  232. msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
  233. if procs:
  234. msg += ":\n%s" % str(procs)
  235. print(msg)
  236. def idle_commands(self, delay, fds=None):
  237. nextsleep = delay
  238. if not fds:
  239. fds = []
  240. for function, data in list(self._idlefuns.items()):
  241. try:
  242. retval = function(self, data, False)
  243. if retval is False:
  244. del self._idlefuns[function]
  245. nextsleep = None
  246. elif retval is True:
  247. nextsleep = None
  248. elif isinstance(retval, float) and nextsleep:
  249. if (retval < nextsleep):
  250. nextsleep = retval
  251. elif nextsleep is None:
  252. continue
  253. else:
  254. fds = fds + retval
  255. except SystemExit:
  256. raise
  257. except Exception as exc:
  258. if not isinstance(exc, bb.BBHandledException):
  259. logger.exception('Running idle function')
  260. del self._idlefuns[function]
  261. self.quit = True
  262. # Create new heartbeat event?
  263. now = time.time()
  264. if now >= self.next_heartbeat:
  265. # We might have missed heartbeats. Just trigger once in
  266. # that case and continue after the usual delay.
  267. self.next_heartbeat += self.heartbeat_seconds
  268. if self.next_heartbeat <= now:
  269. self.next_heartbeat = now + self.heartbeat_seconds
  270. heartbeat = bb.event.HeartbeatEvent(now)
  271. bb.event.fire(heartbeat, self.cooker.data)
  272. if nextsleep and now + nextsleep > self.next_heartbeat:
  273. # Shorten timeout so that we we wake up in time for
  274. # the heartbeat.
  275. nextsleep = self.next_heartbeat - now
  276. if nextsleep is not None:
  277. if self.xmlrpc:
  278. nextsleep = self.xmlrpc.get_timeout(nextsleep)
  279. try:
  280. return select.select(fds,[],[],nextsleep)[0]
  281. except InterruptedError:
  282. # Ignore EINTR
  283. return []
  284. else:
  285. return select.select(fds,[],[],0)[0]
  286. class ServerCommunicator():
  287. def __init__(self, connection, recv):
  288. self.connection = connection
  289. self.recv = recv
  290. def runCommand(self, command):
  291. self.connection.send(command)
  292. if not self.recv.poll(30):
  293. logger.info("No reply from server in 30s")
  294. if not self.recv.poll(30):
  295. raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
  296. return self.recv.get()
  297. def updateFeatureSet(self, featureset):
  298. _, error = self.runCommand(["setFeatures", featureset])
  299. if error:
  300. logger.error("Unable to set the cooker to the correct featureset: %s" % error)
  301. raise BaseException(error)
  302. def getEventHandle(self):
  303. handle, error = self.runCommand(["getUIHandlerNum"])
  304. if error:
  305. logger.error("Unable to get UI Handler Number: %s" % error)
  306. raise BaseException(error)
  307. return handle
  308. def terminateServer(self):
  309. self.connection.send(['terminateServer'])
  310. return
  311. class BitBakeProcessServerConnection(object):
  312. def __init__(self, ui_channel, recv, eq, sock):
  313. self.connection = ServerCommunicator(ui_channel, recv)
  314. self.events = eq
  315. # Save sock so it doesn't get gc'd for the life of our connection
  316. self.socket_connection = sock
  317. def terminate(self):
  318. self.socket_connection.close()
  319. self.connection.connection.close()
  320. self.connection.recv.close()
  321. return
  322. class BitBakeServer(object):
  323. start_log_format = '--- Starting bitbake server pid %s at %s ---'
  324. start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
  325. def __init__(self, lock, sockname, configuration, featureset):
  326. self.configuration = configuration
  327. self.featureset = featureset
  328. self.sockname = sockname
  329. self.bitbake_lock = lock
  330. self.readypipe, self.readypipein = os.pipe()
  331. # Create server control socket
  332. if os.path.exists(sockname):
  333. os.unlink(sockname)
  334. # Place the log in the builddirectory alongside the lock file
  335. logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
  336. self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  337. # AF_UNIX has path length issues so chdir here to workaround
  338. cwd = os.getcwd()
  339. try:
  340. os.chdir(os.path.dirname(sockname))
  341. self.sock.bind(os.path.basename(sockname))
  342. finally:
  343. os.chdir(cwd)
  344. self.sock.listen(1)
  345. os.set_inheritable(self.sock.fileno(), True)
  346. startdatetime = datetime.datetime.now()
  347. bb.daemonize.createDaemon(self._startServer, logfile)
  348. self.sock.close()
  349. self.bitbake_lock.close()
  350. os.close(self.readypipein)
  351. ready = ConnectionReader(self.readypipe)
  352. r = ready.poll(5)
  353. if not r:
  354. bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
  355. r = ready.poll(90)
  356. if r:
  357. try:
  358. r = ready.get()
  359. except EOFError:
  360. # Trap the child exitting/closing the pipe and error out
  361. r = None
  362. if not r or r[0] != "r":
  363. ready.close()
  364. bb.error("Unable to start bitbake server (%s)" % str(r))
  365. if os.path.exists(logfile):
  366. logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
  367. started = False
  368. lines = []
  369. lastlines = []
  370. with open(logfile, "r") as f:
  371. for line in f:
  372. if started:
  373. lines.append(line)
  374. else:
  375. lastlines.append(line)
  376. res = logstart_re.match(line.rstrip())
  377. if res:
  378. ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format)
  379. if ldatetime >= startdatetime:
  380. started = True
  381. lines.append(line)
  382. if len(lastlines) > 60:
  383. lastlines = lastlines[-60:]
  384. if lines:
  385. if len(lines) > 60:
  386. bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
  387. else:
  388. bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
  389. elif lastlines:
  390. bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
  391. else:
  392. bb.error("%s doesn't exist" % logfile)
  393. raise SystemExit(1)
  394. ready.close()
  395. def _startServer(self):
  396. print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format)))
  397. sys.stdout.flush()
  398. server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
  399. self.configuration.setServerRegIdleCallback(server.register_idle_function)
  400. os.close(self.readypipe)
  401. writer = ConnectionWriter(self.readypipein)
  402. try:
  403. self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
  404. except bb.BBHandledException:
  405. return None
  406. writer.send("r")
  407. writer.close()
  408. server.cooker = self.cooker
  409. server.server_timeout = self.configuration.server_timeout
  410. server.xmlrpcinterface = self.configuration.xmlrpcinterface
  411. print("Started bitbake server pid %d" % os.getpid())
  412. sys.stdout.flush()
  413. server.start()
  414. def connectProcessServer(sockname, featureset):
  415. # Connect to socket
  416. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  417. # AF_UNIX has path length issues so chdir here to workaround
  418. cwd = os.getcwd()
  419. readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
  420. eq = command_chan_recv = command_chan = None
  421. sock.settimeout(10)
  422. try:
  423. try:
  424. os.chdir(os.path.dirname(sockname))
  425. finished = False
  426. while not finished:
  427. try:
  428. sock.connect(os.path.basename(sockname))
  429. finished = True
  430. except IOError as e:
  431. if e.errno == errno.EWOULDBLOCK:
  432. pass
  433. raise
  434. finally:
  435. os.chdir(cwd)
  436. # Send an fd for the remote to write events to
  437. readfd, writefd = os.pipe()
  438. eq = BBUIEventQueue(readfd)
  439. # Send an fd for the remote to recieve commands from
  440. readfd1, writefd1 = os.pipe()
  441. command_chan = ConnectionWriter(writefd1)
  442. # Send an fd for the remote to write commands results to
  443. readfd2, writefd2 = os.pipe()
  444. command_chan_recv = ConnectionReader(readfd2)
  445. sendfds(sock, [writefd, readfd1, writefd2])
  446. server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
  447. # Close the ends of the pipes we won't use
  448. for i in [writefd, readfd1, writefd2]:
  449. os.close(i)
  450. server_connection.connection.updateFeatureSet(featureset)
  451. except (Exception, SystemExit) as e:
  452. if command_chan_recv:
  453. command_chan_recv.close()
  454. if command_chan:
  455. command_chan.close()
  456. for i in [writefd, readfd1, writefd2]:
  457. try:
  458. if i:
  459. os.close(i)
  460. except OSError:
  461. pass
  462. sock.close()
  463. raise
  464. return server_connection
  465. def sendfds(sock, fds):
  466. '''Send an array of fds over an AF_UNIX socket.'''
  467. fds = array.array('i', fds)
  468. msg = bytes([len(fds) % 256])
  469. sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
  470. def recvfds(sock, size):
  471. '''Receive an array of fds over an AF_UNIX socket.'''
  472. a = array.array('i')
  473. bytes_size = a.itemsize * size
  474. msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
  475. if not msg and not ancdata:
  476. raise EOFError
  477. try:
  478. if len(ancdata) != 1:
  479. raise RuntimeError('received %d items of ancdata' %
  480. len(ancdata))
  481. cmsg_level, cmsg_type, cmsg_data = ancdata[0]
  482. if (cmsg_level == socket.SOL_SOCKET and
  483. cmsg_type == socket.SCM_RIGHTS):
  484. if len(cmsg_data) % a.itemsize != 0:
  485. raise ValueError
  486. a.frombytes(cmsg_data)
  487. assert len(a) % 256 == msg[0]
  488. return list(a)
  489. except (ValueError, IndexError):
  490. pass
  491. raise RuntimeError('Invalid data received')
  492. class BBUIEventQueue:
  493. def __init__(self, readfd):
  494. self.eventQueue = []
  495. self.eventQueueLock = threading.Lock()
  496. self.eventQueueNotify = threading.Event()
  497. self.reader = ConnectionReader(readfd)
  498. self.t = threading.Thread()
  499. self.t.setDaemon(True)
  500. self.t.run = self.startCallbackHandler
  501. self.t.start()
  502. def getEvent(self):
  503. self.eventQueueLock.acquire()
  504. if len(self.eventQueue) == 0:
  505. self.eventQueueLock.release()
  506. return None
  507. item = self.eventQueue.pop(0)
  508. if len(self.eventQueue) == 0:
  509. self.eventQueueNotify.clear()
  510. self.eventQueueLock.release()
  511. return item
  512. def waitEvent(self, delay):
  513. self.eventQueueNotify.wait(delay)
  514. return self.getEvent()
  515. def queue_event(self, event):
  516. self.eventQueueLock.acquire()
  517. self.eventQueue.append(event)
  518. self.eventQueueNotify.set()
  519. self.eventQueueLock.release()
  520. def send_event(self, event):
  521. self.queue_event(pickle.loads(event))
  522. def startCallbackHandler(self):
  523. bb.utils.set_process_name("UIEventQueue")
  524. while True:
  525. try:
  526. self.reader.wait()
  527. event = self.reader.get()
  528. self.queue_event(event)
  529. except EOFError:
  530. # Easiest way to exit is to close the file descriptor to cause an exit
  531. break
  532. self.reader.close()
  533. class ConnectionReader(object):
  534. def __init__(self, fd):
  535. self.reader = multiprocessing.connection.Connection(fd, writable=False)
  536. self.rlock = multiprocessing.Lock()
  537. def wait(self, timeout=None):
  538. return multiprocessing.connection.wait([self.reader], timeout)
  539. def poll(self, timeout=None):
  540. return self.reader.poll(timeout)
  541. def get(self):
  542. with self.rlock:
  543. res = self.reader.recv_bytes()
  544. return multiprocessing.reduction.ForkingPickler.loads(res)
  545. def fileno(self):
  546. return self.reader.fileno()
  547. def close(self):
  548. return self.reader.close()
  549. class ConnectionWriter(object):
  550. def __init__(self, fd):
  551. self.writer = multiprocessing.connection.Connection(fd, readable=False)
  552. self.wlock = multiprocessing.Lock()
  553. # Why bb.event needs this I have no idea
  554. self.event = self
  555. def send(self, obj):
  556. obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
  557. with self.wlock:
  558. self.writer.send_bytes(obj)
  559. def fileno(self):
  560. return self.writer.fileno()
  561. def close(self):
  562. return self.writer.close()