Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions mysqloperator/controller/diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,20 @@ class CandidateStatus:
bad_gtid_set: Optional[str] = None


def get_topology_instance_info(cluster_status: dict, pod: MySQLPod) -> Optional[dict]:
topology = cluster_status["defaultReplicaSet"]["topology"]
if pod.instance_type == "group-member":
return topology.get(pod.endpoint)
elif pod.instance_type == "read-replica":
for info in topology.values():
for rr_member, rr_info in info.get("readReplicas", {}).items():
if rr_member == pod.endpoint:
return rr_info
return None
else:
raise Exception(f"Unknown instance type for {pod.name}: {pod.instance_type}")


def check_errant_gtids(primary_session: 'ClassicSession', pod: MySQLPod, pod_dba: 'Dba', logger) -> Optional[str]:
try:
gtid_set = pod_dba.session.run_sql(
Expand Down Expand Up @@ -353,8 +367,8 @@ def diagnose_cluster_candidate(primary_session: 'ClassicSession', cluster: 'Clus
# TODO disable queryMembers
is_member = False
try:
topology = cluster.status()["defaultReplicaSet"]["topology"]
is_member = pod.endpoint in topology.keys()
is_member = get_topology_instance_info(
cluster.status({"extended": 1}), pod) is not None
except RuntimeError as e:
e_str = str(e)
if "bad_alloc" in e_str or "std::bad_alloc" in e_str:
Expand Down
40 changes: 39 additions & 1 deletion mysqloperator/controller/innodbcluster/cluster_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
"exitStateAction": "ABORT_SERVER"
}

# Synthetic role used for pod labels/annotations; async read replicas are not GR members.
READ_REPLICA_ROLE = "READ_REPLICA"

def select_pod_with_most_gtids(gtids: Dict[int, str]) -> int:
pod_indexes = list(gtids.keys())
pod_indexes.sort(key = lambda a: mysqlutils.count_gtids(gtids[a]))
Expand Down Expand Up @@ -123,7 +126,10 @@ def probe_status_if_needed(self, changed_pod: MySQLPod, logger: Logger) -> diagn

def probe_member_status(self, pod: MySQLPod, session: 'ClassicSession', joined: bool, logger: Logger) -> None:
# TODO use diagnose?
minfo = shellutils.query_membership_info(session)
if pod.instance_type == "read-replica":
minfo = self.probe_read_replica_status(pod, session, logger)
else:
minfo = shellutils.query_membership_info(session)
member_id, role, status, view_id, version, mcount, rmcount = minfo
logger.debug(
f"instance probe: role={role} status={status} view_id={view_id} version={version} members={mcount} reachable_members={rmcount}")
Expand All @@ -137,6 +143,38 @@ def probe_member_status(self, pod: MySQLPod, session: 'ClassicSession', joined:

return minfo

def probe_read_replica_status(self, pod: MySQLPod, session: 'ClassicSession', logger: Logger) -> tuple:
row = session.run_sql("SELECT @@server_uuid, @@version").fetch_one()
member_id = row[0] or ""
version = row[1] or ""
status = "OFFLINE"

if not self.dba_cluster:
self.connect_to_cluster(logger)
assert self.dba_cluster

try:
instance_info = diagnose.get_topology_instance_info(
self.dba_cluster.status({"extended": 1}), pod)
except RuntimeError as e:
e_str = str(e)
if "bad_alloc" in e_str or "std::bad_alloc" in e_str:
logger.warning(f"cluster.status() hit std::bad_alloc while probing read replica {pod.endpoint}: error={e}")
else:
logger.info(f"probe_read_replica_status: RuntimeError from status(): {e}")
raise
except mysqlsh.Error as e:
if shellutils.check_fatal(
e, pod.endpoint_url_safe, "status()", logger):
raise
logger.info(f"cluster.status() failed while probing read replica {pod.endpoint}: error={e}")
raise

if instance_info:
status = instance_info.get("status") or "OFFLINE"

return member_id, READ_REPLICA_ROLE, status, "", version, None, None

def connect_to_primary(self, primary_pod: MySQLPod, logger: Logger) -> 'Cluster':
if primary_pod:
self.dba = shellutils.connect_dba(
Expand Down
Loading