Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public final class ManagerMessages {
public static final String AFTER_THIS_SUCCESSFUL_SYNC_IF_SUBSCRIPTIONINFO_IS_EMPTY_DURING_THIS =
"After this successful sync, if SubscriptionInfo is empty during this sync and has not been modified afterwards, all subsequent syncs will be skipped";
public static final String ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA =
"Attempt to report pipe exception to a null PipeTaskMeta.";
"尝试向空的 PipeTaskMeta 上报 pipe 异常。";
public static final String AUTH_RUN_AUTH_PLAN = "Auth: run auth plan: {}";
public static final String CLUSTERID = "clusterID: {}";
public static final String COLLECTING_PIPE_HEARTBEAT_FROM_DATA_NODES =
"Collecting pipe heartbeat {} from data nodes";
"正在从 data nodes 收集 pipe 心跳 {}";
public static final String CONNECTION_FROM_DATANODE_TO_DATANODE_IS_BROKEN =
"Connection from DataNode {} to DataNode {} is broken";
public static final String CONSENSUSGROUPSTATISTICS = "[ConsensusGroupStatistics]\t {}: {} -> {}";
Expand Down Expand Up @@ -128,7 +128,7 @@ public final class ManagerMessages {
public static final String FAILED_TO_CREATE_PEER_FOR_CONSENSUS_GROUP =
"Failed to create peer for consensus group";
public static final String FAILED_TO_CREATE_PIPE_RESULT_STATUS =
"Failed to create pipe {}. Result status: {}.";
"创建 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_CREATE_SUBTASK_FOR_PIPE_CREATION_TIME =
"Failed to create subtask for pipe %s, creation time %d";
public static final String FAILED_TO_CREATE_TOPIC_WITH_ATTRIBUTES_RESULT_STATUS =
Expand All @@ -143,7 +143,7 @@ public final class ManagerMessages {
public static final String FAILED_TO_DEREGISTER_PIPE_TEMPORARY_META_METRICS_PIPETEMPORARYMETA_DOES_NOT =
"Failed to deregister pipe temporary meta metrics, PipeTemporaryMeta({}) does not exist";
public static final String FAILED_TO_DROP_PIPE_RESULT_STATUS =
"Failed to drop pipe {}. Result status: {}.";
"删除 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_GET_ALL_PIPE_INFO = "Failed to get all pipe info.";
public static final String FAILED_TO_GET_ALL_SUBSCRIPTION_INFO =
"Failed to get all subscription info.";
Expand All @@ -162,17 +162,17 @@ public final class ManagerMessages {
public static final String FAILED_TO_SHOW_SUBSCRIPTION_INFO = "Failed to show subscription info.";
public static final String FAILED_TO_SHOW_TOPIC_INFO = "Failed to show topic info.";
public static final String FAILED_TO_START_PIPE_RESULT_STATUS =
"Failed to start pipe {}. Result status: {}.";
"启动 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_STOP_PIPE_RESULT_STATUS =
"Failed to stop pipe {}. Result status: {}.";
"停止 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_CREATION_FOR =
"Failed to submit async consensus pipe creation for {}: {}";
public static final String FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_DROP_FOR =
"Failed to submit async consensus pipe drop for {}: {}";
public static final String FAILED_TO_SYNC_CONSUMER_GROUP_META_RESULT_STATUS =
"Failed to sync consumer group meta. Result status: {}.";
public static final String FAILED_TO_SYNC_PIPE_META_RESULT_STATUS =
"Failed to sync pipe meta. Result status: {}.";
"同步 pipe 元数据失败。结果状态:{}。";
public static final String FAILED_TO_SYNC_TEMPLATE_EXTENSION_INFO_TO_DATANODE =
"Failed to sync template {} extension info to DataNode {}";
public static final String FAILED_TO_SYNC_TOPIC_META_RESULT_STATUS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ public final class DataNodePipeMessages {
"Failed to persist progress index to configNode, status: {}";
public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
"Failure when register pipe plugin {}. Skip this plugin and continue startup.";
public static final String
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN =
"Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.";
public static final String
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED =
"Failed to register PipePlugin %s(%s), because its instance can not be constructed successfully. Exception: %s";
public static final String FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH =
"Failed to register PipePlugin %s, because existed md5 of jar file for pipe plugin %s is different from the new jar file.";
public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN =
"Failed to deregister builtin PipePlugin %s.";
public static final String PIPECONNECTOR = "PipeConnector: ";
public static final String PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION =
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}";
Expand Down Expand Up @@ -439,8 +449,15 @@ public final class DataNodePipeMessages {
public static final String FAILED_TO_START_SOURCES = "failed to start sources.";
public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE =
"Heartbeat Event {} can not be supplied because the reference count can not be increased";
public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST =
"Event %s can not be supplied because the reference count can not be increased, the data represented by this event is lost";
public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP =
"Interrupted waiting for processor to stop";
public static final String INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE =
"Interrupted when waiting for parsing privilege for TsFile %s.";
public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR =
"Parse TsFile %s when checking privilege error. Because: %s";
public static final String READ_TSFILE_ERROR = "Read TsFile %s error.";
public static final String IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER =
"IoTDBSchemaRegionSource does not support transferring events under simple consensus";
public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT =
Expand Down Expand Up @@ -826,6 +843,8 @@ public final class DataNodePipeMessages {
public static final String REDIRECT_FILE_POSITION_TO = "Redirect file position to {}.";
public static final String REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE =
"Redirect to position {} in transferring tsFile {}.";
public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS =
"Network failed to receive tsFile %s, status: %s";
public static final String SECURITY_DIR = "security dir: {}";
public static final String SECURITY_PKI_DIR = "security pki dir: {}";
public static final String SUCCESSFULLY_ADDED_ITEM = "Successfully added item {}.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ public final class DataNodePipeMessages {
// ===================== AGENT =====================

public static final String ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A =
"Attempt to report pipe exception to a null PipeTaskMeta.";
"尝试向空的 PipeTaskMeta 上报 pipe 异常。";
public static final String CANNOT_PARSE_REBOOT_TIMES_FROM_FILE_SET =
"无法解析 reboot times from file {}, set the current time in seconds ({}) as the reboot times";
public static final String CANNOT_RECORD_REBOOT_TIMES_TO_FILE_THE =
"无法记录 reboot times {} to file {}, the reboot times will not be updated";
public static final String CANNOT_START_SIMPLEPROGRESSINDEXASSIGNER_BECAUSE_OF =
"无法启动 SimpleProgressIndexAssigner because of {}";
public static final String CREATE_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
"创建 pipe DN task {} successfully within {} ms";
"创建 pipe DN task {} 成功,耗时 {} ms";
public static final String DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
"Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
"注销子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
"Drop pipe DN task {} successfully within {} ms";
"删除 pipe DN task {} 成功,耗时 {} ms";
public static final String ERROR_OCCURRED_WHEN_COLLECTING_EVENTS_FROM_PROCESSOR =
"collecting events from processor 时发生错误";
public static final String EXCEPTION_IN_PIPE_EVENT_PROCESSING_IGNORED_BECAUSE =
Expand Down Expand Up @@ -132,9 +132,19 @@ public final class DataNodePipeMessages {
"获取 pipe task meta from config node. Ignore the exception 失败,原因:config node may not be "
+ "ready yet, and meta will be pushed by config node later.";
public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
"persist progress index to configNode, status: {} 失败";
"持久化 progress index configNode 失败,状态:{}";
public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
"Failure when register pipe plugin {}. Skip this plugin and continue startup.";
"注册 pipe plugin {} 失败。将跳过该插件并继续启动。";
public static final String
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN =
"注册 PipePlugin %s 失败,因为给定的 PipePlugin 名称与内置 PipePlugin 名称重复。";
public static final String
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED =
"注册 PipePlugin %s(%s) 失败,因为其实例无法成功构造。异常:%s";
public static final String FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH =
"注册 PipePlugin %s 失败,因为 pipe plugin %s 已存在的 jar 文件 MD5 与新的 jar 文件不同。";
public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN =
"注销内置 PipePlugin %s 失败。";
public static final String PIPECONNECTOR = "PipeConnector: ";
public static final String PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION =
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}";
Expand All @@ -157,51 +167,51 @@ public final class DataNodePipeMessages {
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in "
+ "sending tsfile by default to reserve disk and network IO for realtime sending.";
public static final String PIPEEVENTCOLLECTOR_THE_EVENT_IS_ALREADY_RELEASED_SKIPPING =
"PipeEventCollector: The event {} is already released, skipping it.";
"PipeEventCollector:事件 {} 已被释放,跳过处理。";
public static final String PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS =
"Pipe:connector subtask {} ({}) 已关闭 within {} ms";
public static final String PIPE_META_NOT_FOUND = "Pipe meta not found: ";
public static final String PIPE_META_NOT_FOUND = "未找到 pipe 元数据:";
public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
"Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} and "
+ "callbackExecutor {}.";
public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
"Pipe skipping temporary TsFile which shouldn't be transferred: {}";
"Pipe 跳过不应传输的临时 TsFile{}";
public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
"Pulled pipe meta from config node: {}, recovering ...";
"已从 config node 拉取 pipe 元数据:{},正在恢复 ...";
public static final String RECEIVED_PIPE_HEARTBEAT_REQUEST_FROM_CONFIG_NODE =
"Received pipe heartbeat request {} from config node.";
"收到来自 config node 的 pipe 心跳请求 {}。";
public static final String REGION_NO_TSFILEINSERTIONEVENTS_TO_REPLACE_FOR_SOURCE =
"Region {}: No TsFileInsertionEvents to replace for source files {}";
public static final String REGION_REPLACED_TSFILEINSERTIONEVENTS_WITH =
"Region {}: Replaced TsFileInsertionEvents {} with {}";
public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount < 0";
public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount <= 0";
public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount 小于 0";
public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount 小于等于 0";
public static final String REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
"Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
"注册子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE =
"Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}";
public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount < 0";
public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount <= 0";
"向本地 PipeTaskMeta({}) 上报 PipeRuntimeException,异常信息:{}";
public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount 小于 0";
public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount 小于等于 0";
public static final String SIMPLEPROGRESSINDEXASSIGNER_STARTED_SUCCESSFULLY_ISSIMPLECONSENSUSENABLE_R =
"SimpleProgressIndexAssigner started successfully. isSimpleConsensusEnable: {}, "
"SimpleProgressIndexAssigner 启动成功。isSimpleConsensusEnable: {}, "
+ "rebootTimes: {}";
public static final String STARTING_SIMPLEPROGRESSINDEXASSIGNER =
"Starting SimpleProgressIndexAssigner ...";
"正在启动 SimpleProgressIndexAssigner ...";
public static final String START_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
"Start pipe DN task {} successfully within {} ms";
"启动 pipe DN task {} 成功,耗时 {} ms";
public static final String START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
"Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
"启动子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String STOP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
"Stop pipe DN task {} successfully within {} ms";
"停止 pipe DN task {} 成功,耗时 {} ms";
public static final String STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
"Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
"停止子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String SUBTASK_IS_CLOSED_IGNORE_EXCEPTION =
"subtask {} 已关闭, ignore exception";
public static final String SUBTASK_WORKER_IS_INTERRUPTED = "subtask worker is interrupted";
public static final String SUBTASK_WORKER_IS_INTERRUPTED = "子任务工作线程被中断";
public static final String SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO =
"成功 persisted all pipe's info to configNode。";
public static final String THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN =
"The executor {} and {} has been successfully shutdown.";
"执行器 {} {} 已成功关闭。";

// ===================== EVENT =====================

Expand Down Expand Up @@ -422,9 +432,16 @@ public final class DataNodePipeMessages {
"加载 snapshot from byteBuffer {} 失败。";
public static final String FAILED_TO_START_SOURCES = "启动 sources 失败。";
public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE =
"Heartbeat Event {} can not be supplied because the reference count can not be increased";
"Heartbeat Event {} 无法被提供,因为其引用计数无法增加";
public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST =
"Event %s 无法被提供,因为其引用计数无法增加,事件代表的数据已经丢失";
public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP =
"Interrupted waiting for processor to stop";
"等待 processor 停止时被中断";
public static final String INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE =
"等待解析 TsFile %s 的权限信息时被中断。";
public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR =
"检查权限时解析 TsFile %s 失败。原因:%s";
public static final String READ_TSFILE_ERROR = "读取 TsFile %s 失败。";
public static final String IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER =
"IoTDBSchemaRegionSource 不支持 transferring events under simple consensus";
public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT = "没有权限 transfer event: ";
Expand Down Expand Up @@ -829,6 +846,8 @@ public final class DataNodePipeMessages {
+ "Peeked event: {}, polled event: {}.";
public static final String THE_FILE_IS_NOT_FOUND_MAY_ALREADY =
"The file {} is not found, may already be deleted.";
public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS =
"网络接收 TsFile %s 失败,状态:%s";
public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT =
"The pipe {} was dropped so the event ack {} will be ignored.";
public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT_1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ private void checkIfRegistered(final PipePluginMeta pipePluginMeta) throws PipeE
if (information.isBuiltin()) {
String errorMessage =
String.format(
"Failed to register PipePlugin %s, because "
+ "the given PipePlugin name is the same as a built-in PipePlugin name.",
DataNodePipeMessages
.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN,
pluginName);
LOGGER.warn(errorMessage);
throw new PipeException(errorMessage);
Expand All @@ -113,10 +113,9 @@ private void checkIfRegistered(final PipePluginMeta pipePluginMeta) throws PipeE
&& !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
String errMsg =
String.format(
"Failed to register PipePlugin %s, because "
+ "existed md5 of jar file for pipe plugin %s "
+ "is different from the new jar file.",
pluginName, pluginName);
DataNodePipeMessages.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH,
pluginName,
pluginName);
LOGGER.warn(errMsg);
throw new PipeException(errMsg);
}
Expand Down Expand Up @@ -170,9 +169,11 @@ public void doRegister(final PipePluginMeta pipePluginMeta) throws PipeException
| ClassCastException e) {
String errorMessage =
String.format(
"Failed to register PipePlugin %s(%s), because "
+ "its instance can not be constructed successfully. Exception: %s",
pluginName.toUpperCase(), className, e);
DataNodePipeMessages
.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED,
pluginName.toUpperCase(),
className,
e);
LOGGER.warn(errorMessage, e);
throw new PipeException(errorMessage);
}
Expand Down Expand Up @@ -210,7 +211,8 @@ public void deregister(final String pluginName, final boolean needToDeleteJar)

if (information != null && information.isBuiltin()) {
String errorMessage =
String.format("Failed to deregister builtin PipePlugin %s.", pluginName);
String.format(
DataNodePipeMessages.FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN, pluginName);
LOGGER.warn(errorMessage);
throw new PipeException(errorMessage);
}
Expand Down
Loading
Loading