# Copyright (C) 2012, Daniel Narvaez # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ UNSTABLE. """ import logging import time import gi gi.require_version('Atspi', '2.0') from gi.repository import Atspi from gi.repository import GLib def get_root(): return Node(Atspi.get_desktop(0)) def _retry_find(func): def wrapped(*args, **kwargs): result = None n_retries = 1 while n_retries <= 50: logging.info("Try %d, name=%s role_name=%s" % (n_retries, kwargs.get("name", None), kwargs.get("role_name", None))) try: result = func(*args, **kwargs) except GLib.GError as e: # The application is not responding, try again if e.code == Atspi.Error.IPC: continue logging.error("GError code %d" % e.code) raise expect_none = kwargs.get("expect_none", False) if (not expect_none and result) or \ (expect_none and not result): return result time.sleep(1) n_retries = n_retries + 1 return result return wrapped class Node: def __init__(self, accessible): self._accessible = accessible def dump(self): lines = [] self._crawl_accessible(self, 0, lines) return "\n".join(lines) def do_action(self, name): for i in range(self._accessible.get_n_actions()): # New, incompatible API if hasattr(self._accessible, "get_action_name"): action_name = self._accessible.get_action_name(i) else: action_name = Atspi.Action.get_name(self._accessible, i) if action_name == name: self._accessible.do_action(i) def click(self, button=1): point = self._accessible.get_position(Atspi.CoordType.SCREEN) Atspi.generate_mouse_event(point.x, point.y, "b%sc" % button) @property def name(self): return self._accessible.get_name() @property def role_name(self): return self._accessible.get_role_name() @property def text(self): return Atspi.Text.get_text(self._accessible, 0, -1) def get_children(self): children = [] for i in range(self._accessible.get_child_count()): child = self._accessible.get_child_at_index(i) # We sometimes get none children from atspi if child is not None: children.append(Node(child)) return children @_retry_find def find_children(self, name=None, role_name=None): def predicate(node): return self._predicate(node, name, role_name) descendants = [] self._find_all_descendants(self, predicate, descendants) if not descendants: return [] return descendants @_retry_find def find_child(self, name=None, role_name=None, expect_none=False): def predicate(node): return self._predicate(node, name, role_name) node = self._find_descendant(self, predicate) if node is None: return None return node def __str__(self): return "[%s | %s]" % (self.name, self.role_name) def _predicate(self, node, name, role_name): if name is not None and name != node.name: return False if role_name is not None and role_name != node.role_name: return False return True def _find_descendant(self, node, predicate): if predicate(node): return node for child in node.get_children(): descendant = self._find_descendant(child, predicate) if descendant is not None: return descendant return None def _find_all_descendants(self, node, predicate, matches): if predicate(node): matches.append(node) for child in node.get_children(): self._find_all_descendants(child, predicate, matches) def _crawl_accessible(self, node, depth, lines): lines.append(" " * depth + str(node)) for child in node.get_children(): self._crawl_accessible(child, depth + 1, lines)