diff --git a/ranking/src/main/java/youthfi/ranking/config/TopologyConfig.java b/ranking/src/main/java/youthfi/ranking/config/TopologyConfig.java index d9554ec..fa85978 100644 --- a/ranking/src/main/java/youthfi/ranking/config/TopologyConfig.java +++ b/ranking/src/main/java/youthfi/ranking/config/TopologyConfig.java @@ -1,7 +1,6 @@ package youthfi.ranking.config; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; @@ -12,11 +11,10 @@ import org.springframework.context.annotation.Configuration; import youthfi.ranking.model.ExecutionRow; import youthfi.ranking.model.RankItem; -import youthfi.ranking.transformer.RealizedRateFifoTransformer; +import youthfi.ranking.transformer.BaselineRateTransformer; import youthfi.ranking.transformer.TopNTransformer; import youthfi.ranking.util.DebeziumParser; -import java.nio.charset.StandardCharsets; import java.util.List; @Configuration @@ -30,47 +28,49 @@ public class TopologyConfig { @Bean public KStream kStream(StreamsBuilder builder) { - // 0) execution 파싱 (키 = userId|stockId) + // 1) execution 파싱 KStream execStream = builder .stream(executionTopic, Consumed.with(Serdes.String(), Serdes.String())) .mapValues(DebeziumParser::parseExecution) - .filter((k,v) -> v != null) - .selectKey((k,v) -> v.getUserId() + "|" + v.getStockId()); + .filter((k,v) -> v != null); - // 1) BUY 롯 상태 스토어 (키=userId|stockId, 값=Deque 직렬화 바이트) - var lotsStoreBuilder = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("buy-lots"), - Serdes.String(), Serdes.ByteArray()); - builder.addStateStore(lotsStoreBuilder); + // 2) userId로 rekey (같은 유저 이벤트 순서 보장) + KStream byUser = execStream + .selectKey((k,v) -> v.getUserId()); - // 2) FIFO 처리 → SELL 시 실현 수익률(Double) 방출 - KStream realizedRatePerTrade = - execStream.transformValues( - () -> new RealizedRateFifoTransformer("buy-lots"), - "buy-lots" - ).filter((k,v) -> v != null); + // 3) baseline 상태 스토어 (userId -> baseline cash) + var baselineStore = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("baseline-cash-store"), + Serdes.String(), Serdes.Double() + ); + builder.addStateStore(baselineStore); - // 3) 유저별 최신 실현 수익률 유지 - KTable userLatestRate = realizedRatePerTrade - .selectKey((userStockKey, rate) -> userStockKey.split("\\|", 2)[0]) // userId + // 4) 매도 시점의 수익률 계산 + KStream userRate = byUser + .transformValues(() -> new BaselineRateTransformer("baseline-cash-store"), + "baseline-cash-store") + .filter((userId, rate) -> rate != null); + + // 5) 유저별 최신 수익률 유지 + KTable latestUserRate = userRate .groupByKey(Grouped.with(Serdes.String(), Serdes.Double())) .reduce((oldV, newV) -> newV, - Materialized.>as("user-latest-realized-rate") + Materialized.>as("user-latest-rate") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Double()) ); - // 4) Top10 계산 (StateStore) + // 6) Top10 계산 (StateStore) var topStoreBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("top10-store"), Serdes.String(), Serdes.Double()); builder.addStateStore(topStoreBuilder); - KStream> top10 = userLatestRate + KStream> top10 = latestUserRate .toStream() .transformValues(() -> new TopNTransformer("top10-store"), "top10-store"); - // 5) JSON 직렬화 후 발행 + // 7) JSON 직렬화 후 발행 top10 .mapValues(list -> { try { return M.writeValueAsString(list); } @@ -79,33 +79,7 @@ public KStream kStream(StreamsBuilder builder) { .selectKey((k,v) -> "TOP10") .to(outTopic, Produced.with(Serdes.String(), Serdes.String())); + // optional: 디버깅용 리턴 return builder.stream(outTopic, Consumed.with(Serdes.String(), Serdes.String())); } - - // ---- Serde: ExecutionRow ---- - private Serde executionSerde() { - var ser = new org.apache.kafka.common.serialization.Serializer() { - @Override public byte[] serialize(String topic, ExecutionRow d) { - if (d == null) return null; - String s = d.getUserId() + "," + d.getStockId() + "," + d.getPrice() - + "," + d.getIsBuy() + "," + d.getQuantity() + "," + d.getTsMs(); - return s.getBytes(StandardCharsets.UTF_8); - } - }; - var de = new org.apache.kafka.common.serialization.Deserializer() { - @Override public ExecutionRow deserialize(String topic, byte[] bytes) { - if (bytes == null) return null; - String[] p = new String(bytes, StandardCharsets.UTF_8).split(","); - return new ExecutionRow( - p[0], // userId - p[1], // stockId - Double.parseDouble(p[2]), // price - Integer.parseInt(p[3]), // isBuy - Long.parseLong(p[4]), // quantity - (p.length >= 6 ? Long.parseLong(p[5]) : System.currentTimeMillis()) // tsMs - ); - } - }; - return Serdes.serdeFrom(ser, de); - } -} \ No newline at end of file +} diff --git a/ranking/src/main/java/youthfi/ranking/model/ExecutionRow.java b/ranking/src/main/java/youthfi/ranking/model/ExecutionRow.java index aaf04ab..03c164e 100644 --- a/ranking/src/main/java/youthfi/ranking/model/ExecutionRow.java +++ b/ranking/src/main/java/youthfi/ranking/model/ExecutionRow.java @@ -30,4 +30,7 @@ public class ExecutionRow { @JsonProperty("ts_ms") private long tsMs; + @JsonProperty("user_balance_snapshot") + private Double userBalanceSnapshot; + } diff --git a/ranking/src/main/java/youthfi/ranking/transformer/BaselineRateTransformer.java b/ranking/src/main/java/youthfi/ranking/transformer/BaselineRateTransformer.java new file mode 100644 index 0000000..6fa3032 --- /dev/null +++ b/ranking/src/main/java/youthfi/ranking/transformer/BaselineRateTransformer.java @@ -0,0 +1,44 @@ +package youthfi.ranking.transformer; + +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import youthfi.ranking.model.ExecutionRow; + +public class BaselineRateTransformer implements ValueTransformerWithKey { + private final String storeName; + private KeyValueStore baselineStore; + + private static final double FIRST_BASELINE = 10_000_000.0; // 최초 1천만원 + + public BaselineRateTransformer(String storeName) { this.storeName = storeName; } + + @Override @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + this.baselineStore = (KeyValueStore) context.getStateStore(storeName); + } + + @Override + public Double transform(String userId, ExecutionRow e) { + if (e == null || userId == null) return null; + + // 매도 + 잔고 스냅샷 있는 경우만 + if (e.getIsBuy() != 0) return null; + if (e.getUserBalanceSnapshot() == null) return null; + + double cashAfter = e.getUserBalanceSnapshot(); + Double base = baselineStore.get(userId); + if (base == null || base <= 0.0) base = FIRST_BASELINE; + + double rate = (cashAfter - base) / base * 100.0; + + // 다음 매도 대비 baseline 갱신 + baselineStore.put(userId, cashAfter); + + // NaN / Infinity 방지 + if (Double.isNaN(rate) || Double.isInfinite(rate)) return null; + return rate; + } + + @Override public void close() {} +} diff --git a/ranking/src/main/java/youthfi/ranking/util/DebeziumParser.java b/ranking/src/main/java/youthfi/ranking/util/DebeziumParser.java index 3b56a99..8056972 100644 --- a/ranking/src/main/java/youthfi/ranking/util/DebeziumParser.java +++ b/ranking/src/main/java/youthfi/ranking/util/DebeziumParser.java @@ -30,7 +30,7 @@ public static ExecutionRow parseExecution(String v){ JsonNode after = payload.path("after"); if (after.isMissingNode() || after.isNull()) return null; - // userId / stockId (snake/camel 모두 지원) + // userId / stockId String userId = after.hasNonNull("user_id") ? after.path("user_id").asText() : after.hasNonNull("userId") ? String.valueOf(after.path("userId").asLong()) : null; @@ -46,37 +46,41 @@ public static ExecutionRow parseExecution(String v){ : Long.parseLong(after.path("quantity").asText("0")); if (quantity <= 0) return null; - // price: 문자열/숫자 모두 처리 + // price BigDecimal priceDec = after.path("price").isNumber() ? after.path("price").decimalValue() : new BigDecimal(after.path("price").asText("0")); double price = priceDec.doubleValue(); if (price <= 0) return null; - // trade_at: µs(int64) 또는 문자열 → ms로 정규화 + // trade_at: µs(int64) 또는 문자열 → ms if (after.has("trade_at")) { JsonNode t = after.path("trade_at"); if (t.isNumber()) { - long micros = t.asLong(); // e.g. 1760732159312309 - tsMs = micros / 1000L; // µs → ms + long micros = t.asLong(); + tsMs = micros / 1000L; } else { String s = t.asText(); try { - // "2025-10-17 11:15:59[.n]" 같은 포맷 지원 LocalDateTime ldt = LocalDateTime.parse(s, DT_MICRO); tsMs = ldt.toInstant(ZoneOffset.UTC).toEpochMilli(); - } catch (Exception ignore) { - // 실패 시 payload.ts_ms 또는 현재시각 유지 - } + } catch (Exception ignore) {} } } else if (after.has("tradeAt")) { tsMs = after.path("tradeAt").asLong(); // ms 가정 } - return new ExecutionRow(userId, stockId, price, isBuy, quantity, tsMs); + // 새로 추가: user_balance_snapshot (nullable) + Double cashAfter = null; + if (after.has("user_balance_snapshot") && !after.get("user_balance_snapshot").isNull()) { + JsonNode n = after.get("user_balance_snapshot"); + BigDecimal dec = n.isNumber() ? n.decimalValue() : new BigDecimal(n.asText("0")); + cashAfter = dec.doubleValue(); + } + + return new ExecutionRow(userId, stockId, price, isBuy, quantity, tsMs, cashAfter); } catch (Exception e) { return null; } } - } diff --git a/ranking/src/main/resources/application.yaml b/ranking/src/main/resources/application.yaml index 15f8e3e..912d14e 100644 --- a/ranking/src/main/resources/application.yaml +++ b/ranking/src/main/resources/application.yaml @@ -11,7 +11,7 @@ spring: username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}"; streams: - application-id: profit-ranking-app-v2 + application-id: profit-ranking-app-v3 state-dir: /tmp/kafka-streams replication-factor: 1 properties: