Source code for estuary_updater.handlers.errata

# SPDX-License-Identifier: GPL-3.0+

from __future__ import unicode_literals, absolute_import

from estuary.models.errata import Advisory, ContainerAdvisory
from estuary.models.bugzilla import BugzillaBug
from estuary.models.user import User
from estuary.utils.general import timestamp_to_datetime
import requests
import requests_kerberos
import neomodel

from estuary_updater.handlers.base import BaseHandler


[docs]class ErrataHandler(BaseHandler): """A handler for dist-git related messages."""
[docs] @staticmethod def can_handle(msg): """ Determine if this handler can handle this message. :param dict msg: a message to be analyzed :return: a bool based on if the handler can handle this kind of message :rtype: bool """ supported_topics = [ '/topic/VirtualTopic.eng.errata.activity.status', '/topic/VirtualTopic.eng.errata.activity.created', '/topic/VirtualTopic.eng.errata.builds.added', '/topic/VirtualTopic.eng.errata.builds.removed' ] return msg['topic'] in supported_topics
[docs] def handle(self, msg): """ Handle an Errata tool message and update Neo4j if necessary. :param dict msg: a message to be processed """ topic = msg['topic'] advisory_topics = [ '/topic/VirtualTopic.eng.errata.activity.status', '/topic/VirtualTopic.eng.errata.activity.created' ] if topic in advisory_topics: self.advisory_handler(msg) elif topic == '/topic/VirtualTopic.eng.errata.builds.added': self.builds_added_handler(msg) elif topic == '/topic/VirtualTopic.eng.errata.builds.removed': self.builds_removed_handler(msg) else: raise RuntimeError('This message is unable to be handled: {0}'.format(msg))
[docs] def advisory_handler(self, msg): """ Handle an Errata tool advisory changes and update Neo4j if necessary. :param dict msg: a message to be processed """ advisory_id = msg['body']['headers']['errata_id'] erratum_url = '{0}/api/v1/erratum/{1}'.format( self.config['estuary_updater.errata_url'].rstrip('/'), advisory_id) response = requests.get(erratum_url, auth=requests_kerberos.HTTPKerberosAuth(), timeout=10) advisory_json = response.json() advisory_type = msg['body']['headers']['type'].lower() advisory_info = advisory_json['errata'][advisory_type] embargoed = msg['body']['headers']['synopsis'] == 'REDACTED' # We can't store information on embargoed advisories other than the ID if not embargoed: product_url = '{0}/products/{1}.json'.format( self.config['estuary_updater.errata_url'].rstrip('/'), advisory_info['product_id'] ) response = requests.get( product_url, auth=requests_kerberos.HTTPKerberosAuth(), timeout=10) product_json = response.json() reporter_url = '{0}/api/v1/user/{1}'.format( self.config['estuary_updater.errata_url'].rstrip('/'), advisory_info['reporter_id']) response = requests.get( reporter_url, auth=requests_kerberos.HTTPKerberosAuth(), timeout=10) reporter_json = response.json() reporter = User.create_or_update({ 'username': reporter_json['login_name'].split('@')[0], 'email': reporter_json['email_address'] })[0] assigned_to_url = '{0}/api/v1/user/{1}'.format( self.config['estuary_updater.errata_url'].rstrip('/'), advisory_info['assigned_to_id']) response = requests.get( assigned_to_url, auth=requests_kerberos.HTTPKerberosAuth(), timeout=10) assigned_to_json = response.json() assigned_to = User.create_or_update({ 'username': assigned_to_json['login_name'].split('@')[0], 'email': assigned_to_json['email_address'] })[0] advisory_params = { 'advisory_name': advisory_info['fulladvisory'], 'id_': advisory_id, 'product_name': product_json['product']['name'], 'security_impact': advisory_info['security_impact'], 'state': advisory_info['status'], 'synopsis': msg['body']['headers']['synopsis'] } for dt in ('actual_ship_date', 'created_at', 'issue_date', 'release_date', 'security_sla', 'status_updated_at', 'update_date'): if advisory_info[dt]: if dt == 'status_updated_at': estuary_key = 'status_time' else: estuary_key = dt advisory_params[estuary_key] = timestamp_to_datetime(advisory_info[dt]) else: advisory_params = { 'id_': advisory_id, # Set this to REDACTED and it'll be updated when it becomes public 'advisory_name': 'REDACTED' } if 'docker' in advisory_info['content_types']: try: advisory = ContainerAdvisory.create_or_update(advisory_params)[0] except neomodel.exceptions.ConstraintValidationFailed: # This must have errantly been created as an Advisory instead of a # ContainerAdvisory, so let's fix that. advisory = Advisory.nodes.get_or_none(id_=advisory_id) if not advisory: # If there was a constraint validation failure and the advisory isn't just # the wrong label, then we can't recover. raise advisory.add_label(ContainerAdvisory.__label__) advisory = ContainerAdvisory.create_or_update(advisory_params)[0] else: # Check to see if a ContainerAdvisory using this id already exists, and if so remove its # label because it should not be a ContainerAdvisory if docker isn't a content type. container_adv = ContainerAdvisory.nodes.get_or_none(id_=advisory_id) if container_adv: container_adv.remove_label(ContainerAdvisory.__label__) advisory = Advisory.create_or_update(advisory_params)[0] if not embargoed: advisory.conditional_connect(advisory.reporter, reporter) advisory.conditional_connect(advisory.assigned_to, assigned_to) bugs = advisory_json['bugs']['bugs'] for bug in bugs: bug = BugzillaBug.get_or_create({'id_': bug['bug']['id']})[0] advisory.attached_bugs.connect(bug)
[docs] def builds_added_handler(self, msg): """ Handle an Errata tool builds added message and update Neo4j if necessary. :param dict msg: a message to be processed """ embargoed = msg['body']['headers']['brew_build'] == 'REDACTED' # We can't store information on embargoed advisories other than the ID if embargoed: return advisory = Advisory.get_or_create({ 'id_': msg['body']['headers']['errata_id'] })[0] nvr = msg['body']['headers']['brew_build'] koji_build = self.get_or_create_build(nvr) time_attached_string = msg['body']['headers']['when'] if time_attached_string.endswith(' UTC'): time_attached_string = time_attached_string[:-4] time_attached = timestamp_to_datetime(time_attached_string) attached_rel = advisory.attached_builds.relationship(koji_build) if attached_rel: if attached_rel.time_attached != time_attached: advisory.attached_builds.replace(koji_build, {'time_attached': time_attached}) else: advisory.attached_builds.connect(koji_build, {'time_attached': time_attached})
[docs] def builds_removed_handler(self, msg): """ Handle an Errata tool builds removed message and update Neo4j if necessary. :param dict msg: a message to be processed """ embargoed = msg['body']['headers']['brew_build'] == 'REDACTED' # We can't store information on embargoed advisories other than the ID if embargoed: return advisory = Advisory.get_or_create({ 'id_': msg['body']['headers']['errata_id'] })[0] nvr = msg['body']['headers']['brew_build'] koji_build = self.get_or_create_build(nvr) advisory.attached_builds.disconnect(koji_build)