1. Зайти в консоль messaging cd /opt/express && DPL_PULL_POLICY=never dpl --dc exec messaging ./bin/messaging remote_console или docker exec -it {messaging_container} ./bin/messaging remote_console если у вас kubernetes, {cts-messaging} - заменить на имя пода kubectl exec -it {cts-messaging} -- ./bin/messaging remote_console 2. Скопировать код приведенный ниже и нажать Enter defmodule Messaging.Troubleshooting do defmodule Chats do alias CcsSdk.API.Messaging.V1, as: MessagingV1 alias Messaging.Chats.{ChatsContext, GroupChat} alias Messaging.Chats.RoutingContext alias Messaging.Chats.GroupChat.Changeset.ChatServersChangeset alias Messaging.Chats.GroupChat.Commands.ChatSyncPullCommand alias Messaging.Chats.GroupChat.Queries.GetChatsByHuidInMembersQuery alias Messaging.Events.Kafka.EventHandler.Decoders.ChatParamsDecoder alias Messaging.Events.{SystemEventsContext, ChatEventsContext} alias Messaging.Event alias Messaging.Repo require Logger def resync_chat_servers(chat_id) do {:ok, base} = ChatsContext.get_chat_base_from_cache(chat_id) RoutingContext.sync(base.members) {:ok, chat} = ChatsContext.get_chat(chat_id, preload: true) ChatsContext.activate_chat(chat) chat |> ChatServersChangeset.apply() |> Repo.update!(force: true) ChatsContext.clear_chat_cache(chat.id, "chat_sync") end def update_chat_servers(chat_id) do with {:ok, chat} <- ChatsContext.get_chat(chat_id, preload: true) do chat |> ChatServersChangeset.apply() |> Repo.update!(force: true) ChatsContext.clear_chat_cache(chat_id, "chat_sync") end end def refresh_chat(chat_id) do with {:ok, chat_data} <- MessagingV1.sync_data(chat_id), decoded_data = ChatParamsDecoder.decode(chat_data), {:ok, result} <- ChatsContext.sync_chat(decoded_data, []) do produce_chat_updated(result.group_chat) end end def sync_chat(chat_id, server_id) do ChatSyncPullCommand.execute(chat_id, server_id) end def repair(chat_id) do with :ok <- refetch_routing_info_for_chat(chat_id) do produce_chat_updated(chat_id) end end def repair_user(user_huid) do with {:ok, _} <- RoutingContext.sync([user_huid]), {:ok, chats} <- GetChatsByHuidInMembersQuery.execute(user_huid) do Enum.each(chats, fn(chat) -> clear_cache(chat.id) produce_chat_updated(chat) produce_routing_changed(chat, user_huid) end) end end def repair_user_with_chats(user_huid) do with {:ok, _} <- RoutingContext.sync([user_huid]), {:ok, chats} <- GetChatsByHuidInMembersQuery.execute(user_huid) do Enum.each(chats, fn(chat) -> refresh_chat(chat.id) clear_cache(chat.id) end) end end def produce_chat_updated(%GroupChat{} = chat) do with {:ok, chat_updated} <- SystemEventsContext.chat_updated(chat) do Event.emit(chat_updated, chat: chat) {:ok, [id: chat.id]} else error -> Logger.error("Troubleshooting.Chats produce chat updated error: #{chat.id} #{error}") {:error, [id: chat.id, reason: error]} end end def produce_chat_updated(chat_id) do with {:ok, chat} <- ChatsContext.get_chat(chat_id, preload: true) do produce_chat_updated(chat) end end def produce_routing_changed(%GroupChat{} = chat, user_huid) do backward = %GroupChat{chat | chat_member_routing_infos: []} routing_info = Enum.find(chat.chat_member_routing_infos, &(&1.user_huid == user_huid)) forward = %GroupChat{chat | chat_member_routing_infos: [routing_info]} with {:ok, routing_changed} <- ChatEventsContext.routing_changed(backward, forward) do Event.emit(routing_changed, chat: chat) {:ok, [id: chat.id]} else error -> Logger.error("Troubleshooting.Chats produce chat routing changed error: #{chat.id} #{error}") {:error, [id: chat.id, reason: error]} end end def activate_chat(chat_id) do with {:ok, chat} <- ChatsContext.get_chat(chat_id), {:ok, updated_chat} <- ChatsContext.activate_chat(chat) do clear_cache(chat_id) produce_chat_updated(updated_chat) end end def refetch_routing_info_for_chat(chat_id) do {:ok, chat} = ChatsContext.get_chat(chat_id) with {:ok, _} <- RoutingContext.sync(chat.members) do ChatsContext.clear_chat_cache(chat_id, "chat_sync") end end def refetch_routing_info(user_huid) do with {:ok, _} <- RoutingContext.sync([user_huid]), {:ok, chats} <- GetChatsByHuidInMembersQuery.execute(user_huid) do Enum.each(chats, &(ChatsContext.clear_chat_cache(&1.id, "chat_sync"))) end end def clear_cache(chat_id) do ChatsContext.clear_chat_cache(chat_id, "chat_sync") end def validate_chat(chat_id) do {:ok, local_chat} = ChatsContext.get_chat(chat_id, preload: true) {:ok, local_chat_from_cache} = ChatsContext.get_chat_from_cache(chat_id) check1 = validate_chat_member_routing_info( {:local, local_chat.chat_member_routing_infos}, {:local_cache, local_chat_from_cache.chat_member_routing_infos} ) check2 = validate_keys( {:local, ChatsContext.keys(local_chat)}, {:local_cache, local_chat_from_cache.keys} ) [check1, check2] end defp validate_chat_member_routing_info({tag1, cmr_info1}, {tag2, cmr_info2}) do i1 = cmr_info1 |> Enum.map(&map_routing_info/1) |> Enum.sort_by(&(&1.user_huid)) i2 = cmr_info2 |> Enum.map(&map_routing_info/1) |> Enum.sort_by(&(&1.user_huid)) validate_diff(:routing_info, {tag1, i1}, {tag2, i2}) end defp map_routing_info(routing_info) do routing_info |> Map.from_struct() |> Map.take([ :active, :cts_id, :cts_key_id, :ets_id, :rts_id, :rts_key_id, :user_huid, :user_kind ]) end defp validate_keys({tag1, keys1}, {tag2, keys2}) do validate_diff(:keys, {tag1, keys1}, {tag2, keys2}) end defp validate_diff(set_name, {tag1, set1}, {tag2, set2}) do missing1 = case set1 -- set2 do [] -> [] diff -> [{tag2, diff}] end missing2 = case set2 -- set1 do [] -> [] diff -> [{tag1, diff}] end case missing1 ++ missing2 do [] -> {set_name, :ok} missing -> {set_name, {:missing, missing}} end end end end 3. Далее Messaging.Troubleshooting.Chats.repair_user("HUID_пользователя_с_проблемой")