Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 26 additions & 52 deletions ranking/src/main/java/youthfi/ranking/config/TopologyConfig.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package youthfi.ranking.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -12,11 +11,10 @@
import org.springframework.context.annotation.Configuration;
import youthfi.ranking.model.ExecutionRow;
import youthfi.ranking.model.RankItem;
import youthfi.ranking.transformer.RealizedRateFifoTransformer;
import youthfi.ranking.transformer.BaselineRateTransformer;
import youthfi.ranking.transformer.TopNTransformer;
import youthfi.ranking.util.DebeziumParser;

import java.nio.charset.StandardCharsets;
import java.util.List;

@Configuration
Expand All @@ -30,47 +28,49 @@ public class TopologyConfig {
@Bean
public KStream<String, String> kStream(StreamsBuilder builder) {

// 0) execution 파싱 (키 = userId|stockId)
// 1) execution 파싱
KStream<String, ExecutionRow> execStream = builder
.stream(executionTopic, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(DebeziumParser::parseExecution)
.filter((k,v) -> v != null)
.selectKey((k,v) -> v.getUserId() + "|" + v.getStockId());
.filter((k,v) -> v != null);

// 1) BUY 롯 상태 스토어 (키=userId|stockId, 값=Deque<BuyLot> 직렬화 바이트)
var lotsStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("buy-lots"),
Serdes.String(), Serdes.ByteArray());
builder.addStateStore(lotsStoreBuilder);
// 2) userId로 rekey (같은 유저 이벤트 순서 보장)
KStream<String, ExecutionRow> byUser = execStream
.selectKey((k,v) -> v.getUserId());

// 2) FIFO 처리 → SELL 시 실현 수익률(Double) 방출
KStream<String, Double> realizedRatePerTrade =
execStream.transformValues(
() -> new RealizedRateFifoTransformer("buy-lots"),
"buy-lots"
).filter((k,v) -> v != null);
// 3) baseline 상태 스토어 (userId -> baseline cash)
var baselineStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("baseline-cash-store"),
Serdes.String(), Serdes.Double()
);
builder.addStateStore(baselineStore);

// 3) 유저별 최신 실현 수익률 유지
KTable<String, Double> userLatestRate = realizedRatePerTrade
.selectKey((userStockKey, rate) -> userStockKey.split("\\|", 2)[0]) // userId
// 4) 매도 시점의 수익률 계산
KStream<String, Double> userRate = byUser
.transformValues(() -> new BaselineRateTransformer("baseline-cash-store"),
"baseline-cash-store")
.filter((userId, rate) -> rate != null);

// 5) 유저별 최신 수익률 유지
KTable<String, Double> latestUserRate = userRate
.groupByKey(Grouped.with(Serdes.String(), Serdes.Double()))
.reduce((oldV, newV) -> newV,
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("user-latest-realized-rate")
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("user-latest-rate")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);

// 4) Top10 계산 (StateStore)
// 6) Top10 계산 (StateStore)
var topStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("top10-store"),
Serdes.String(), Serdes.Double());
builder.addStateStore(topStoreBuilder);

KStream<String, List<RankItem>> top10 = userLatestRate
KStream<String, List<RankItem>> top10 = latestUserRate
.toStream()
.transformValues(() -> new TopNTransformer("top10-store"), "top10-store");

// 5) JSON 직렬화 후 발행
// 7) JSON 직렬화 후 발행
top10
.mapValues(list -> {
try { return M.writeValueAsString(list); }
Expand All @@ -79,33 +79,7 @@ public KStream<String, String> kStream(StreamsBuilder builder) {
.selectKey((k,v) -> "TOP10")
.to(outTopic, Produced.with(Serdes.String(), Serdes.String()));

// optional: 디버깅용 리턴
return builder.stream(outTopic, Consumed.with(Serdes.String(), Serdes.String()));
}

// ---- Serde: ExecutionRow ----
private Serde<ExecutionRow> executionSerde() {
var ser = new org.apache.kafka.common.serialization.Serializer<ExecutionRow>() {
@Override public byte[] serialize(String topic, ExecutionRow d) {
if (d == null) return null;
String s = d.getUserId() + "," + d.getStockId() + "," + d.getPrice()
+ "," + d.getIsBuy() + "," + d.getQuantity() + "," + d.getTsMs();
return s.getBytes(StandardCharsets.UTF_8);
}
};
var de = new org.apache.kafka.common.serialization.Deserializer<ExecutionRow>() {
@Override public ExecutionRow deserialize(String topic, byte[] bytes) {
if (bytes == null) return null;
String[] p = new String(bytes, StandardCharsets.UTF_8).split(",");
return new ExecutionRow(
p[0], // userId
p[1], // stockId
Double.parseDouble(p[2]), // price
Integer.parseInt(p[3]), // isBuy
Long.parseLong(p[4]), // quantity
(p.length >= 6 ? Long.parseLong(p[5]) : System.currentTimeMillis()) // tsMs
);
}
};
return Serdes.serdeFrom(ser, de);
}
}
}
3 changes: 3 additions & 0 deletions ranking/src/main/java/youthfi/ranking/model/ExecutionRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public class ExecutionRow {
@JsonProperty("ts_ms")
private long tsMs;

@JsonProperty("user_balance_snapshot")
private Double userBalanceSnapshot;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package youthfi.ranking.transformer;

import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import youthfi.ranking.model.ExecutionRow;

public class BaselineRateTransformer implements ValueTransformerWithKey<String, ExecutionRow, Double> {
private final String storeName;
private KeyValueStore<String, Double> baselineStore;

private static final double FIRST_BASELINE = 10_000_000.0; // 최초 1천만원

public BaselineRateTransformer(String storeName) { this.storeName = storeName; }

@Override @SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.baselineStore = (KeyValueStore<String, Double>) context.getStateStore(storeName);
}

@Override
public Double transform(String userId, ExecutionRow e) {
if (e == null || userId == null) return null;

// 매도 + 잔고 스냅샷 있는 경우만
if (e.getIsBuy() != 0) return null;
if (e.getUserBalanceSnapshot() == null) return null;

double cashAfter = e.getUserBalanceSnapshot();
Double base = baselineStore.get(userId);
if (base == null || base <= 0.0) base = FIRST_BASELINE;

double rate = (cashAfter - base) / base * 100.0;

// 다음 매도 대비 baseline 갱신
baselineStore.put(userId, cashAfter);

// NaN / Infinity 방지
if (Double.isNaN(rate) || Double.isInfinite(rate)) return null;
return rate;
}

@Override public void close() {}
}
26 changes: 15 additions & 11 deletions ranking/src/main/java/youthfi/ranking/util/DebeziumParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static ExecutionRow parseExecution(String v){
JsonNode after = payload.path("after");
if (after.isMissingNode() || after.isNull()) return null;

// userId / stockId (snake/camel 모두 지원)
// userId / stockId
String userId = after.hasNonNull("user_id") ? after.path("user_id").asText()
: after.hasNonNull("userId") ? String.valueOf(after.path("userId").asLong())
: null;
Expand All @@ -46,37 +46,41 @@ public static ExecutionRow parseExecution(String v){
: Long.parseLong(after.path("quantity").asText("0"));
if (quantity <= 0) return null;

// price: 문자열/숫자 모두 처리
// price
BigDecimal priceDec = after.path("price").isNumber()
? after.path("price").decimalValue()
: new BigDecimal(after.path("price").asText("0"));
double price = priceDec.doubleValue();
if (price <= 0) return null;

// trade_at: µs(int64) 또는 문자열 → ms로 정규화
// trade_at: µs(int64) 또는 문자열 → ms
if (after.has("trade_at")) {
JsonNode t = after.path("trade_at");
if (t.isNumber()) {
long micros = t.asLong(); // e.g. 1760732159312309
tsMs = micros / 1000L; // µs → ms
long micros = t.asLong();
tsMs = micros / 1000L;
} else {
String s = t.asText();
try {
// "2025-10-17 11:15:59[.n]" 같은 포맷 지원
LocalDateTime ldt = LocalDateTime.parse(s, DT_MICRO);
tsMs = ldt.toInstant(ZoneOffset.UTC).toEpochMilli();
} catch (Exception ignore) {
// 실패 시 payload.ts_ms 또는 현재시각 유지
}
} catch (Exception ignore) {}
}
} else if (after.has("tradeAt")) {
tsMs = after.path("tradeAt").asLong(); // ms 가정
}

return new ExecutionRow(userId, stockId, price, isBuy, quantity, tsMs);
// 새로 추가: user_balance_snapshot (nullable)
Double cashAfter = null;
if (after.has("user_balance_snapshot") && !after.get("user_balance_snapshot").isNull()) {
JsonNode n = after.get("user_balance_snapshot");
BigDecimal dec = n.isNumber() ? n.decimalValue() : new BigDecimal(n.asText("0"));
cashAfter = dec.doubleValue();
}

return new ExecutionRow(userId, stockId, price, isBuy, quantity, tsMs, cashAfter);
} catch (Exception e) {
return null;
}
}

}
2 changes: 1 addition & 1 deletion ranking/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ spring:
username="${KAFKA_USERNAME}"
password="${KAFKA_PASSWORD}";
streams:
application-id: profit-ranking-app-v2
application-id: profit-ranking-app-v3
state-dir: /tmp/kafka-streams
replication-factor: 1
properties:
Expand Down